Apache Flink - 实现一个状态可能非常大的流处理器
Apache Flink - implementing a stream processor with potentially very large state
我希望从事件流中投射出一个潜在的非常大的状态。这就是我以命令式方式实现它的方式:
class ImperativeFooProcessor {
val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState]
def handle(event: InputEvent) = {
event match {
case FooAdded(fooId, barId) => {
// retrieve relevant state and do some work on it
val barState = state(barId)
// let the world know about what may have happened
publish(BarOccured(fooId, barId))
// or maybe rather
publish(BazOccured(fooId, barId))
}
case FooRemoved(fooId, barId) => {
// retrieve relevant state and do some work on it
val barState = state(barId)
// let the world know about what may have happened
publish(BarOccured(fooId, barId))
// or maybe rather
publish(BazOccured(fooId, barId))
}
}
}
private def publish(event: OutputEvent): Unit = {
// push event to downstream sink
}
}
在最坏的情况下,BarState 的大小会随着它被 FooAdded
提及的次数而增长
相对于每个 barId 的事件总数,唯一 barId 的数量非常少。
我将如何开始在 Flink 中表示这种处理结构?
我如何处理每个 BarState 可能变得非常大的事实?
Flink 在所谓的状态后端中维护状态。有在工作进程的 JVM 堆上运行的状态后端(MemoryStateBackend
和 FsStateBackend
)。这些后端不适合处理大型状态。
Flink 还有一个基于 RocksDB 的 RocksDBStateBackend。 RocksDB 在每个工作节点上作为本地数据库(无需设置为外部服务),并将状态数据写入磁盘。因此,它可以处理超出内存的非常大的状态。
Flink 提供了一个KeyedStream
,它是一个根据特定属性分区的流。在您的情况下,您可能希望对同一 id 的所有访问都进入同一状态实例,因此您将使用 barId
作为键。然后,状态基于 barId
在所有并行工作线程之间进行分区。这基本上是一个分布式键值存储或映射。所以你不需要将状态表示为地图,因为它是由 Flink 自动分发的。
我希望从事件流中投射出一个潜在的非常大的状态。这就是我以命令式方式实现它的方式:
class ImperativeFooProcessor {
val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState]
def handle(event: InputEvent) = {
event match {
case FooAdded(fooId, barId) => {
// retrieve relevant state and do some work on it
val barState = state(barId)
// let the world know about what may have happened
publish(BarOccured(fooId, barId))
// or maybe rather
publish(BazOccured(fooId, barId))
}
case FooRemoved(fooId, barId) => {
// retrieve relevant state and do some work on it
val barState = state(barId)
// let the world know about what may have happened
publish(BarOccured(fooId, barId))
// or maybe rather
publish(BazOccured(fooId, barId))
}
}
}
private def publish(event: OutputEvent): Unit = {
// push event to downstream sink
}
}
在最坏的情况下,BarState 的大小会随着它被 FooAdded
相对于每个 barId 的事件总数,唯一 barId 的数量非常少。
我将如何开始在 Flink 中表示这种处理结构?
我如何处理每个 BarState 可能变得非常大的事实?
Flink 在所谓的状态后端中维护状态。有在工作进程的 JVM 堆上运行的状态后端(MemoryStateBackend
和 FsStateBackend
)。这些后端不适合处理大型状态。
Flink 还有一个基于 RocksDB 的 RocksDBStateBackend。 RocksDB 在每个工作节点上作为本地数据库(无需设置为外部服务),并将状态数据写入磁盘。因此,它可以处理超出内存的非常大的状态。
Flink 提供了一个KeyedStream
,它是一个根据特定属性分区的流。在您的情况下,您可能希望对同一 id 的所有访问都进入同一状态实例,因此您将使用 barId
作为键。然后,状态基于 barId
在所有并行工作线程之间进行分区。这基本上是一个分布式键值存储或映射。所以你不需要将状态表示为地图,因为它是由 Flink 自动分发的。