FlinkKafkaConsumer 中的事件时间顺序保证

Guarantee of event-time order in FlinkKafkaConsumer

TL;DR: 目前在 Flink 中保证事件的事件时间顺序的最佳解决方案是什么?

我使用 Flink 1.8.0 和 Kafka 2.2.1。我需要通过事件时间戳来保证事件的正确顺序。我每 1 秒生成一次周期性水印。我将 FlinkKafkaConsumer 与 AscendingTimestampExtractor 一起使用:

val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
      override def extractAscendingTimestamp(element: T): Long =
        timestampExtractor(element)
      })
 .addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)

然后处理:

myStream
   .keyBy(ev => (ev.name, ev.group))
   .mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)

我意识到,对于在同一毫秒或几毫秒后出现的无序事件,Flink 不会更正顺序。我在文档中找到的内容:

the watermark triggers computation of all windows where the maximum timestamp (which is end-timestamp - 1) is smaller than the new watermark

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows

所以我准备了额外的处理步骤来保证事件时间顺序:

myStream
      .timeWindowAll(Time.milliseconds(100))
      .apply((window, input, out: Collector[MyEvent]) => input
        .toList.sortBy(_.getTimestamp)
        .foreach(out.collect) // this windowing guarantee correct order by event time
      )(TypeInformation.of(classOf[MyEvent]))
      .keyBy(ev => (ev.name, ev.group))
      .mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)

但是,我发现这个解决方案很丑陋,它看起来像是一种解决方法。我也很关心per-partition watermarks of KafkaSource

理想情况下,我想将顺序保证放在 KafkaSource 中,并为每个 kafka 分区保留它,例如每个分区的水印。有可能这样做吗? 目前Flink中保证事件顺序的最佳方案是什么?

这是一个很好的观点。 KafkaSource中的顺序保证其实包括两部分。

  1. 保证同一子任务中分区之间的顺序。
  2. 保证子任务之间的顺序。

第一部分已在 https://issues.apache.org/jira/browse/FLINK-12675 中进行。而第二部分需要子任务之间共享状态的支持,这可能需要社区更多的讨论和详细的计划。

回到你的问题,我认为通过设置一个window来缓冲数据来保持事件的顺序是目前最好的解决方案。

Flink 不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序处理,但当两个或多个分区合并到一个新分区时(由于流的重新分区或联合),Flink 会随机将这些分区的记录合并到新分区中。其他一切都会效率低下并导致更高的延迟。

例如,如果您的作业有一个从两个 Kafka 分区读取的源任务,则两个分区的记录将以某种随机的之字形模式合并。

但是,对于生成的水印,Flink 保证所有事件都得到正确处理。这意味着,水印永远不会超过记录。例如,如果您的 Kafka 源生成每个分区的水印,即使在合并多个分区的记录后,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它确保了输入数据的完整性。

这是按时间戳对记录排序的先决条件。您可以通过翻滚 window 来做到这一点。但是,您应该知道

  1. a window 全部将在单个任务中执行(即,它不是并行化的)。如果每个键的顺序足够,您应该使用常规的翻滚 window 或者甚至更好地实现 KeyedProcessFunction,这样效率更高。
  2. 由于重新分区或更改并行度而重组流时,顺序将被破坏。