FlinkKafkaConsumer 中的事件时间顺序保证
Guarantee of event-time order in FlinkKafkaConsumer
TL;DR: 目前在 Flink 中保证事件的事件时间顺序的最佳解决方案是什么?
我使用 Flink 1.8.0 和 Kafka 2.2.1。我需要通过事件时间戳来保证事件的正确顺序。我每 1 秒生成一次周期性水印。我将 FlinkKafkaConsumer 与 AscendingTimestampExtractor 一起使用:
val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)
然后处理:
myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)
我意识到,对于在同一毫秒或几毫秒后出现的无序事件,Flink 不会更正顺序。我在文档中找到的内容:
the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
所以我准备了额外的处理步骤来保证事件时间顺序:
myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)
但是,我发现这个解决方案很丑陋,它看起来像是一种解决方法。我也很关心per-partition watermarks of KafkaSource
理想情况下,我想将顺序保证放在 KafkaSource 中,并为每个 kafka 分区保留它,例如每个分区的水印。有可能这样做吗? 目前Flink中保证事件顺序的最佳方案是什么?
这是一个很好的观点。 KafkaSource中的顺序保证其实包括两部分。
- 保证同一子任务中分区之间的顺序。
- 保证子任务之间的顺序。
第一部分已在 https://issues.apache.org/jira/browse/FLINK-12675 中进行。而第二部分需要子任务之间共享状态的支持,这可能需要社区更多的讨论和详细的计划。
回到你的问题,我认为通过设置一个window来缓冲数据来保持事件的顺序是目前最好的解决方案。
Flink 不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序处理,但当两个或多个分区合并到一个新分区时(由于流的重新分区或联合),Flink 会随机将这些分区的记录合并到新分区中。其他一切都会效率低下并导致更高的延迟。
例如,如果您的作业有一个从两个 Kafka 分区读取的源任务,则两个分区的记录将以某种随机的之字形模式合并。
但是,对于生成的水印,Flink 保证所有事件都得到正确处理。这意味着,水印永远不会超过记录。例如,如果您的 Kafka 源生成每个分区的水印,即使在合并多个分区的记录后,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它确保了输入数据的完整性。
这是按时间戳对记录排序的先决条件。您可以通过翻滚 window 来做到这一点。但是,您应该知道
- a window 全部将在单个任务中执行(即,它不是并行化的)。如果每个键的顺序足够,您应该使用常规的翻滚 window 或者甚至更好地实现
KeyedProcessFunction
,这样效率更高。
- 由于重新分区或更改并行度而重组流时,顺序将被破坏。
TL;DR: 目前在 Flink 中保证事件的事件时间顺序的最佳解决方案是什么?
我使用 Flink 1.8.0 和 Kafka 2.2.1。我需要通过事件时间戳来保证事件的正确顺序。我每 1 秒生成一次周期性水印。我将 FlinkKafkaConsumer 与 AscendingTimestampExtractor 一起使用:
val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
override def extractAscendingTimestamp(element: T): Long =
timestampExtractor(element)
})
.addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)
然后处理:
myStream
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)
我意识到,对于在同一毫秒或几毫秒后出现的无序事件,Flink 不会更正顺序。我在文档中找到的内容:
the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark
所以我准备了额外的处理步骤来保证事件时间顺序:
myStream
.timeWindowAll(Time.milliseconds(100))
.apply((window, input, out: Collector[MyEvent]) => input
.toList.sortBy(_.getTimestamp)
.foreach(out.collect) // this windowing guarantee correct order by event time
)(TypeInformation.of(classOf[MyEvent]))
.keyBy(ev => (ev.name, ev.group))
.mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)
但是,我发现这个解决方案很丑陋,它看起来像是一种解决方法。我也很关心per-partition watermarks of KafkaSource
理想情况下,我想将顺序保证放在 KafkaSource 中,并为每个 kafka 分区保留它,例如每个分区的水印。有可能这样做吗? 目前Flink中保证事件顺序的最佳方案是什么?
这是一个很好的观点。 KafkaSource中的顺序保证其实包括两部分。
- 保证同一子任务中分区之间的顺序。
- 保证子任务之间的顺序。
第一部分已在 https://issues.apache.org/jira/browse/FLINK-12675 中进行。而第二部分需要子任务之间共享状态的支持,这可能需要社区更多的讨论和详细的计划。
回到你的问题,我认为通过设置一个window来缓冲数据来保持事件的顺序是目前最好的解决方案。
Flink 不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序处理,但当两个或多个分区合并到一个新分区时(由于流的重新分区或联合),Flink 会随机将这些分区的记录合并到新分区中。其他一切都会效率低下并导致更高的延迟。
例如,如果您的作业有一个从两个 Kafka 分区读取的源任务,则两个分区的记录将以某种随机的之字形模式合并。
但是,对于生成的水印,Flink 保证所有事件都得到正确处理。这意味着,水印永远不会超过记录。例如,如果您的 Kafka 源生成每个分区的水印,即使在合并多个分区的记录后,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它确保了输入数据的完整性。
这是按时间戳对记录排序的先决条件。您可以通过翻滚 window 来做到这一点。但是,您应该知道
- a window 全部将在单个任务中执行(即,它不是并行化的)。如果每个键的顺序足够,您应该使用常规的翻滚 window 或者甚至更好地实现
KeyedProcessFunction
,这样效率更高。 - 由于重新分区或更改并行度而重组流时,顺序将被破坏。