kafka stream windowedBy 没有按预期产生结果
kafka stream windowedBy not producing results when expected
我在 kafka 流中做这个简单的窗口聚合:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.toStream()
.to("output")
而且我希望每 5 秒,它会向输出主题产生一个结果,但是当 运行 它时,它需要大约 17 秒。
所以参考了this guide,我把CACHE_MAX_BYTES_BUFFERING_CONFIG
改成0
,把代码弄成这样(只加了suppress
步骤:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output")
当 运行 第二次尝试时,它没有输出任何内容,在调试时,我看到消息正在通过流处理,日志中没有异常,但没有任何输出。
我错过了什么吗?这段代码不应该每 5 秒产生一次结果吗?
基于此posthttps://www.nerd.vision/post/suppress-surprise-kafka-streams-and-the-suppress-operator
抑制运算符基于事件时间,只要没有新记录到达,流基本上就会被冻结。
此 post 解释了如何测试它。
要使测试正常进行,您需要:
- 产生测试数据
- 产生一个带有未来时间戳的虚拟事件
释放 window 结果断言。
注意每个测试都需要隔离
(例如,在每次单独测试之前将 Kafka 代理和流启动并在之后关闭或关闭测试驱动程序)。
我在 kafka 流中做这个简单的窗口聚合:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.toStream()
.to("output")
而且我希望每 5 秒,它会向输出主题产生一个结果,但是当 运行 它时,它需要大约 17 秒。
所以参考了this guide,我把CACHE_MAX_BYTES_BUFFERING_CONFIG
改成0
,把代码弄成这样(只加了suppress
步骤:
...
.groupByKey(/* Serde stuff */)
.windowedBy(TimeWindows.of(
Duration.ofSeconds(5)
).grace(Duration.ofSeconds(0)).until(5000))
.aggregate(
(key, val, agg) -> {
// aggregation here
},
Materialized.with(/* Serde stuff */)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.to("output")
当 运行 第二次尝试时,它没有输出任何内容,在调试时,我看到消息正在通过流处理,日志中没有异常,但没有任何输出。
我错过了什么吗?这段代码不应该每 5 秒产生一次结果吗?
基于此posthttps://www.nerd.vision/post/suppress-surprise-kafka-streams-and-the-suppress-operator
抑制运算符基于事件时间,只要没有新记录到达,流基本上就会被冻结。
此 post 解释了如何测试它。
要使测试正常进行,您需要:
- 产生测试数据
- 产生一个带有未来时间戳的虚拟事件 释放 window 结果断言。
注意每个测试都需要隔离 (例如,在每次单独测试之前将 Kafka 代理和流启动并在之后关闭或关闭测试驱动程序)。