Flink KeyedProcessFunction 排序
Flink KeyedProcessFunction Ordering
我是 Flink 的新手,正在尝试了解 Flink 如何在并行性下的 KeyedProcessFunction
抽象中命令对 processElement()
的调用。考虑这个产生部分和流的例子:
package sample
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object Playground {
case class Record(groupId: String, score: Int) {}
def main(args: Array[String]): Unit = {
// 1. Create the environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(10)
// 2. Source
val record1 = Record("groupX", 1)
val record2 = Record("groupX", 2)
val record3 = Record("groupX", 3)
val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)
// 3. Application Logic
val partialSums: DataStream[Int] = records
.keyBy(record => record.groupId)
.process(new KeyedProcessFunction[String, Record, Int] {
// Store partial sum of score for Records seen
lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("partialSum", classOf[Int]))
// Ingest new record
override
def processElement(value: Record,
ctx: KeyedProcessFunction[String, Record, Int]#Context,
out: Collector[Int]): Unit =
{
val currentSum: Int = partialSum.value()
partialSum.update(currentSum + value.score)
out.collect(partialSum.value())
}
})
// 4. Sink
partialSums.print()
// 5. Build JobGraph and execute
env.execute("sample-job")
}
}
我希望它的输出是流:1, 3, 6, 7, 9, 12
。确实是,在这里。
假设情况总是如此是否安全,尤其是从具有大量并行性的源读取时?
在您的示例中,每个键中的顺序得到保证。由于只有一把钥匙,您将始终获得 1, 3, 6, 7, 9, 12
.
当您从并行度大于 1 的源读取时,各种源实例将相互竞争。当来自两个或多个源的流被连接(例如,通过 keyBy、union、rebalance 等)时,结果是不确定的(但来自每个源的事件将保持其相对顺序)。
例如,如果您有
stream X: 1, 2, 3, 4
stream Y: a, b, c, d
然后将这两个流放在一起,您可能会得到
1, 2, 3, 4, a, b, c, d
,或a, b, 1, 2, 3, c, 4, d
,等等
我是 Flink 的新手,正在尝试了解 Flink 如何在并行性下的 KeyedProcessFunction
抽象中命令对 processElement()
的调用。考虑这个产生部分和流的例子:
package sample
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object Playground {
case class Record(groupId: String, score: Int) {}
def main(args: Array[String]): Unit = {
// 1. Create the environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(10)
// 2. Source
val record1 = Record("groupX", 1)
val record2 = Record("groupX", 2)
val record3 = Record("groupX", 3)
val records: DataStream[Record] = env.fromElements(record1, record2, record3, record1, record2, record3)
// 3. Application Logic
val partialSums: DataStream[Int] = records
.keyBy(record => record.groupId)
.process(new KeyedProcessFunction[String, Record, Int] {
// Store partial sum of score for Records seen
lazy val partialSum: ValueState[Int] = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("partialSum", classOf[Int]))
// Ingest new record
override
def processElement(value: Record,
ctx: KeyedProcessFunction[String, Record, Int]#Context,
out: Collector[Int]): Unit =
{
val currentSum: Int = partialSum.value()
partialSum.update(currentSum + value.score)
out.collect(partialSum.value())
}
})
// 4. Sink
partialSums.print()
// 5. Build JobGraph and execute
env.execute("sample-job")
}
}
我希望它的输出是流:1, 3, 6, 7, 9, 12
。确实是,在这里。
假设情况总是如此是否安全,尤其是从具有大量并行性的源读取时?
在您的示例中,每个键中的顺序得到保证。由于只有一把钥匙,您将始终获得 1, 3, 6, 7, 9, 12
.
当您从并行度大于 1 的源读取时,各种源实例将相互竞争。当来自两个或多个源的流被连接(例如,通过 keyBy、union、rebalance 等)时,结果是不确定的(但来自每个源的事件将保持其相对顺序)。
例如,如果您有
stream X: 1, 2, 3, 4
stream Y: a, b, c, d
然后将这两个流放在一起,您可能会得到
1, 2, 3, 4, a, b, c, d
,或a, b, 1, 2, 3, c, 4, d
,等等