Kafka Streams:处理来自不同分区的消息时的事件时间偏差
Kafka Streams: event-time skew when processing messages from different partitions
让我们考虑一个主题,该主题具有多个分区和按事件时间顺序编写的消息,没有任何特定的分区方案。 Kafka Streams 应用程序对这些消息进行一些转换,然后按某个键分组,然后按事件时间 window 和给定的宽限期聚合消息。
每个任务可以以不同的速度处理传入的消息(例如,因为 运行 在具有不同性能特征的服务器上)。这意味着在 groupBy shuffle 之后,当消息来自不同任务时,事件时间顺序将不会在内部主题的同一分区中的消息之间保留。一段时间后,此事件时间偏差可能会变得大于宽限期,这将导致丢弃来自滞后任务的消息。
增加宽限期似乎不是一个有效的选项,因为它会延迟发出最终聚合结果。 Apache Flink 通过在分区合并时发出最低水印来处理这个问题。
这应该是一个真正的问题,尤其是在处理大量历史数据时,还是我错过了什么? Kafka Streams 是否提供解决方案来处理这种情况?
UPDATE 我的问题不是关于 KStream-KStream 连接,而是关于在流洗牌之前的单个 KStream 事件时间聚合。
考虑这个代码片段:
stream
.mapValues(...)
.groupBy(...)
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
.aggregate(...)
我假设某些任务的 mapValues() 操作可能出于某种原因而变慢,并且由于这些任务确实以不同的速度处理消息。当在 aggregate()
运算符处发生洗牌时,任务 0 可能已经处理了时间 t
的消息,而任务 1 仍在 t-skew
,但是来自两个任务的消息最终交错在一个单一的内部主题的分区(对应分组键)。
我担心的是,当偏差足够大时(在我的示例中超过 10 秒),来自滞后任务 1 的消息将被丢弃。
基本上,task/processor 维护一个 stream-time,它被定义为已轮询的任何记录的最高时间戳。这个 stream-time 然后在 Kafka Streams 中用于不同的目的(例如:Punctator、Windowded Aggregation 等)。
[窗口聚合]
如您所述,流时间用于确定是否应接受记录,即 record_accepted = end_window_time(current record) + grace_period > observed stream_time
.
正如您所描述的,如果多个任务 运行 并行地根据分组键随机播放消息,并且某些任务比其他任务慢(或某些分区处于离线状态),这将创建 out-of-订购信息。不幸的是,恐怕解决这个问题的唯一方法是增加 grace_period
.
这其实就是可用性和一致性之间永恒的权衡。
[KafkaStream 和 KafkaStream/KTable 加入的行为
当您使用 Kafka Streams 执行连接操作时,内部任务会分配给多个共同分区主题的“相同”分区。例如,任务 0 将分配给 Topic1-Partition0 和 TopicB-Partition0。
获取的记录按分区缓冲到由任务管理的内部队列中。因此,每个队列包含等待处理的单个分区的所有记录。
然后,记录从队列中一个接一个地轮询,并由拓扑实例处理。但是,这是从轮询返回的具有最低时间戳的非空队列中的记录。
此外,如果队列为空,则任务可能会在一段时间内空闲,从而不再从队列中轮询记录。您实际上可以配置任务保持空闲的最长时间,可以使用流配置来定义:max.task.idle.ms
此机制允许同步共同定位的分区。但是,默认 max.task.idle.ms
设置为 0。这意味着任务永远不会等待来自分区的更多数据,这可能会导致记录被过滤,因为流时间可能会增加得更快。
让我们考虑一个主题,该主题具有多个分区和按事件时间顺序编写的消息,没有任何特定的分区方案。 Kafka Streams 应用程序对这些消息进行一些转换,然后按某个键分组,然后按事件时间 window 和给定的宽限期聚合消息。
每个任务可以以不同的速度处理传入的消息(例如,因为 运行 在具有不同性能特征的服务器上)。这意味着在 groupBy shuffle 之后,当消息来自不同任务时,事件时间顺序将不会在内部主题的同一分区中的消息之间保留。一段时间后,此事件时间偏差可能会变得大于宽限期,这将导致丢弃来自滞后任务的消息。
增加宽限期似乎不是一个有效的选项,因为它会延迟发出最终聚合结果。 Apache Flink 通过在分区合并时发出最低水印来处理这个问题。
这应该是一个真正的问题,尤其是在处理大量历史数据时,还是我错过了什么? Kafka Streams 是否提供解决方案来处理这种情况?
UPDATE 我的问题不是关于 KStream-KStream 连接,而是关于在流洗牌之前的单个 KStream 事件时间聚合。
考虑这个代码片段:
stream
.mapValues(...)
.groupBy(...)
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
.aggregate(...)
我假设某些任务的 mapValues() 操作可能出于某种原因而变慢,并且由于这些任务确实以不同的速度处理消息。当在 aggregate()
运算符处发生洗牌时,任务 0 可能已经处理了时间 t
的消息,而任务 1 仍在 t-skew
,但是来自两个任务的消息最终交错在一个单一的内部主题的分区(对应分组键)。
我担心的是,当偏差足够大时(在我的示例中超过 10 秒),来自滞后任务 1 的消息将被丢弃。
基本上,task/processor 维护一个 stream-time,它被定义为已轮询的任何记录的最高时间戳。这个 stream-time 然后在 Kafka Streams 中用于不同的目的(例如:Punctator、Windowded Aggregation 等)。
[窗口聚合]
如您所述,流时间用于确定是否应接受记录,即 record_accepted = end_window_time(current record) + grace_period > observed stream_time
.
正如您所描述的,如果多个任务 运行 并行地根据分组键随机播放消息,并且某些任务比其他任务慢(或某些分区处于离线状态),这将创建 out-of-订购信息。不幸的是,恐怕解决这个问题的唯一方法是增加 grace_period
.
这其实就是可用性和一致性之间永恒的权衡。
[KafkaStream 和 KafkaStream/KTable 加入的行为
当您使用 Kafka Streams 执行连接操作时,内部任务会分配给多个共同分区主题的“相同”分区。例如,任务 0 将分配给 Topic1-Partition0 和 TopicB-Partition0。
获取的记录按分区缓冲到由任务管理的内部队列中。因此,每个队列包含等待处理的单个分区的所有记录。
然后,记录从队列中一个接一个地轮询,并由拓扑实例处理。但是,这是从轮询返回的具有最低时间戳的非空队列中的记录。
此外,如果队列为空,则任务可能会在一段时间内空闲,从而不再从队列中轮询记录。您实际上可以配置任务保持空闲的最长时间,可以使用流配置来定义:max.task.idle.ms
此机制允许同步共同定位的分区。但是,默认 max.task.idle.ms
设置为 0。这意味着任务永远不会等待来自分区的更多数据,这可能会导致记录被过滤,因为流时间可能会增加得更快。