带窗口的 Kafka Streams 拓扑不会触发状态更改

Kafka Streams topology with windowing doesn't trigger state changes

我正在构建以下 Kafka Streams 拓扑(伪代码):

gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();

如果您注意到,这是一个类似菱形的拓扑,从单个输入主题开始到单个输出主题结束,消息流经两个并行流,最终在最后连接在一起。一个流适用(翻滚?)windowing,另一个不适用。流程的两个部分都使用相同的密钥(除了由 windowing 中间引入的 WindowedKey 之外)。

我的消息的时间戳是事件时间。也就是说,它们是通过我自定义配置的 TimestampExtractor 实现从消息正文中挑选出来的。我消息中的实际时间戳是几年前的事了。

在我使用几个 input/output 消息和 运行time 环境(使用真正的 Kafka)的单元测试中,乍一看一切都很好。

当消息数量开始增加(例如 40K)时,问题似乎就出现了。

我的失败场景如下:

  1. ~40K记录与相同 密钥首先上传到输入主题

  2. ~40K更新 输出话题出来了,符合预期

  3. 另外 ~40K 条记录 与步骤 1) 相同但不同的是将密钥上传到 输入主题

  4. 只有约 100 个更新来自输出主题, 而不是预期的新 ~40K 更新。没有什么特别的 看看那些~100个更新,他们的内容似乎是正确的,但是 仅在特定时间 windows。其他时间 windows 没有 即使流逻辑和输入数据应该明确更新 生成 40K 条记录。事实上,当我在步骤 1 中交换数据集时) 和 3) 我的情况完全相同,来自 ~40K 的更新 第二个数据集和第一个数据集的数字相同~100。

我可以在本地使用 TopologyTestDriver 在单元测试中轻松重现此问题(但仅限于更大数量的输入记录)。

在我的测试中,我尝试使用 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 禁用缓存。不幸的是,这没有任何区别。

更新

我尝试了 reduce() 调用和 aggregate() 调用。在这两种情况下问题仍然存在。

我还注意到,当 StreamsConfig.TOPOLOGY_OPTIMIZATION 设置为 StreamsConfig.OPTIMIZE 且没有它时,mapValues() 处理程序在调试器 before 中被调用前面的 reduce()(或 aggregate())处理程序至少是第一次。没想到。

不幸的是,尝试了 join() 和 leftJoin() 相同的结果。 在调试器中,数据的第二部分根本不会触发 "left" 流中的 reduce() 处理程序,但会触发 "right" 流中的 reduce() 处理程序。

根据我的配置,如果两个数据集中的数量或记录均为 100,则问题不会自行显现,如我所料,我会收到 200 条输出消息。当我将每个数据集中的数字增加到 200 时,我收到的预期消息少于 400 条。 所以,目前看来 "old" windows 之类的东西被丢弃了,那些旧的 windows 的新记录被流忽略了。 可以设置 window 保留设置,但使用我使用的默认值,我希望 windows 保留其状态并保持活动状态至少 12 小时(超过我的时间单元测试 运行 显着)。

尝试使用以下 Window 存储配置修改左侧减速器:

Materialized.as(
    Stores.inMemoryWindowStore(
        "rollup-left-reduce",
        Duration.ofDays(5 * 365),
        Duration.ofHours(1), false)
)

结果仍然没有差异。

即使只有一个 "left" 流程,没有 "right" 流程,也没有 join(),同样的问题仍然存在。看来问题出在我设置的 window 保留设置中。我的输入记录的时间戳(事件时间)跨越 2 年。第二个数据集再次从 2 年初开始。 Kafka Streams 中的这个地方确保第二个数据集记录被忽略:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L125

Kafka Streams 版本为 2.4.0。同时使用 Confluent 依赖版本 5.4.0.

我的问题是

调试一段时间后,我找到了问题的原因。

我的输入数据集包含时间戳跨越 2 年的记录。我正在加载第一个数据集,并且我的流的 "observed" 时间被设置为来自输入数据集的最大时间戳。

以时间戳比新观察时间早 2 年的记录开始的第二个数据集的上传导致流内部丢弃消息。如果您将 Kafka 日志记录设置为 TRACE 级别,则可以看到这一点。

因此,为了解决我的问题,我必须为 windows 配置保留期和宽限期:

而不是

.windowedBy(TimeWindows.of(windowSize))

我必须指定

.windowedBy(TimeWindows.of(windowSize).grace(Duration.ofDays(5 * 365)))

此外,我必须明确配置 reducer 存储设置为:

 Materialized.as(
    Stores.inMemoryWindowStore(
        "rollup-left-reduce",
        Duration.ofDays(5 * 365),
        windowSize, false)
)

就是这样,输出符合预期。