Kafka Streams 时间序列聚合

Kafka Streams timeseries aggregation

我正在使用 Kafka Streams 来处理时间序列数据。一个用例是每小时为每个传感器聚合数据(传感器 ID 是主题 test 中的消息键)。

我写了一个管道,它按键(传感器 ID)分组,然后计算每小时的读数。

问题是 test 主题中有一些重复消息(相同的传感器 ID 和时间戳)。我只想考虑最新消息。

Streams DSL API 中有什么东西可以完成这个吗?

  meterDataStream
   .groupByKey()
   .count(
     TimeWindows
       .of(TimeUnit.HOURS.toMillis(1))
       .until(TimeUnit.HOURS.toMillis(1)), 
     "counts")
   .foreach((key, value) => {
     val start = epochMillistoDate(key.window().start())
     val end   = epochMillistoDate(key.window().end())
     logger.info(s"$start - $end\t->$value")
   })

您需要为此构建自己的重复数据删除运算符。

meterDateStream
    .transform(/*write your own deduplicator*/)
    .groupByKey()....

重复数据删除器(即 Transformer)必须有一个附加的状态存储,您可能需要检查标点符号。查看文档了解更多详情: