Kafka 流 window 批处理

Kafka streams window batching

来自 Spark Streaming 背景 - 掌握 Kafka 流。

我有一个从 Kafka 读取的简单 Spark Streaming 应用程序,

和returns那一分钟每个用户的最新事件

示例事件看起来像 {"user": 1, "timestamp": "2018-05-18T16:56:30.754Z", "count": 3}, {"user": 1, "timestamp": "2018-05-22T16:56:39.754Z", "count": 4}

我对这在 Kafka Streams 中的工作方式很感兴趣,因为似乎每个事件都有一个输出 - 当我的用例是减少流量时。

从我目前的阅读来看,这似乎不是直截了当的,您必须使用处理器 api。

理想情况下,我想使用 DSL 而不是处理器 API,因为我刚刚开始研究 Kafka 流,但似乎我必须使用处理器 API的 punctuate 方法每 n 秒从状态存储中读取一次?

我正在使用 kafka 0.11.0

在 DSL 级别,Kafka Streams 允许配置 KTable 缓存(默认启用)以减少下游负载。缓存是定期刷新的 LRU 缓存。因此,虽然缓存减少了下游负载,但它不能保证每个 window 得到多少输出。 (比照https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html

如果您严格需要每个 window 的单个输出,使用处理器 API 是正确的方法。

我自己还没有尝试过,但是Kafka Streams现在支持suppress操作。看看here:

You can use Suppress to effectively rate-limit just one KTable output or callback. Or, especially valuable for non-retractable outputs like alerts, you can use it to get only the final results of a windowed aggregation. The clearest use case for Suppress at the moment is getting final results

根据文章,代码可能如下所示:

events
  .groupByKey()
  .windowedBy(
    TimeWindows.of(Duration.ofMinutes(2).withGrace(Duration.ofMinutes(2))
  )
  .count(Materialized.as("count-metric"))
  .suppress(Suppressed.untilWindowClose(BufferConfig.unbounded()))
  .filter( _ < 4 )
  .toStream()
  .foreach( /* Send that email! */)

我正在使用 Kafka Streams 2.6.0,我能够重复使用相同的方法来构建流。