Kafka 流 window 批处理
Kafka streams window batching
来自 Spark Streaming 背景 - 掌握 Kafka 流。
我有一个从 Kafka 读取的简单 Spark Streaming 应用程序,
和returns那一分钟每个用户的最新事件
示例事件看起来像 {"user": 1, "timestamp": "2018-05-18T16:56:30.754Z", "count": 3}, {"user": 1, "timestamp": "2018-05-22T16:56:39.754Z", "count": 4}
我对这在 Kafka Streams 中的工作方式很感兴趣,因为似乎每个事件都有一个输出 - 当我的用例是减少流量时。
从我目前的阅读来看,这似乎不是直截了当的,您必须使用处理器 api。
理想情况下,我想使用 DSL 而不是处理器 API,因为我刚刚开始研究 Kafka 流,但似乎我必须使用处理器 API的 punctuate
方法每 n 秒从状态存储中读取一次?
我正在使用 kafka 0.11.0
在 DSL 级别,Kafka Streams 允许配置 KTable 缓存(默认启用)以减少下游负载。缓存是定期刷新的 LRU 缓存。因此,虽然缓存减少了下游负载,但它不能保证每个 window 得到多少输出。 (比照https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)
如果您严格需要每个 window 的单个输出,使用处理器 API 是正确的方法。
我自己还没有尝试过,但是Kafka Streams现在支持suppress
操作。看看here:
You can use Suppress to effectively rate-limit just one KTable output
or callback. Or, especially valuable for non-retractable outputs like
alerts, you can use it to get only the final results of a windowed
aggregation. The clearest use case for Suppress at the moment is
getting final results
根据文章,代码可能如下所示:
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
)
.count(Materialized.as("count-metric"))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)
我正在使用 Kafka Streams 2.6.0,我能够重复使用相同的方法来构建流。
来自 Spark Streaming 背景 - 掌握 Kafka 流。
我有一个从 Kafka 读取的简单 Spark Streaming 应用程序,
和returns那一分钟每个用户的最新事件
示例事件看起来像 {"user": 1, "timestamp": "2018-05-18T16:56:30.754Z", "count": 3}, {"user": 1, "timestamp": "2018-05-22T16:56:39.754Z", "count": 4}
我对这在 Kafka Streams 中的工作方式很感兴趣,因为似乎每个事件都有一个输出 - 当我的用例是减少流量时。
从我目前的阅读来看,这似乎不是直截了当的,您必须使用处理器 api。
理想情况下,我想使用 DSL 而不是处理器 API,因为我刚刚开始研究 Kafka 流,但似乎我必须使用处理器 API的 punctuate
方法每 n 秒从状态存储中读取一次?
我正在使用 kafka 0.11.0
在 DSL 级别,Kafka Streams 允许配置 KTable 缓存(默认启用)以减少下游负载。缓存是定期刷新的 LRU 缓存。因此,虽然缓存减少了下游负载,但它不能保证每个 window 得到多少输出。 (比照https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)
如果您严格需要每个 window 的单个输出,使用处理器 API 是正确的方法。
我自己还没有尝试过,但是Kafka Streams现在支持suppress
操作。看看here:
You can use Suppress to effectively rate-limit just one KTable output or callback. Or, especially valuable for non-retractable outputs like alerts, you can use it to get only the final results of a windowed aggregation. The clearest use case for Suppress at the moment is getting final results
根据文章,代码可能如下所示:
events
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
)
.count(Materialized.as("count-metric"))
.suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
.filter( _ < 4 )
.toStream()
.foreach( /* Send that email! */)
我正在使用 Kafka Streams 2.6.0,我能够重复使用相同的方法来构建流。