如何在 Kafka 流应用程序中统计固定时间 window 的唯一用户数?

How to count unique users in a fixed time window in a Kafka stream app?

对于用户在我们平台上执行的每个事件,我们在唯一主题中都有一条 kafka 消息。每个事件/kafka 消息都有一个公共字段 userId。我们现在想从该主题中了解我们每小时有多少唯一用户。所以我们对事件类型和用户的个人计数不感兴趣。我们只想知道每小时有多少唯一用户处于活动状态。 实现这一目标的最简单方法是什么?我现在的想法好像不是很简单,伪代码看这里:

stream
 .selectKey() // userId
 .groupByKey() // group by userid, results in a KGroupedStream[UserId, Value]
 .aggregate( // initializer, merger und accumulator simply deliver a constant value, the message is now just a tick for that userId key
   TimeWindows.of(3600000)
 ) // result of aggregate is KTable[Windowed[UserId], Const]
 .toStream // convert in stream to be able to map key in next step
 .map() // map key only (Windowed[Userid]) to key = startMs of window to and value Userid
 .groupByKey() // grouping by startMs of windows, which was selected as key before
 .count() // results in a KTable from startMs of window to counts of users (== unique userIds)

有没有更简单的方法?我可能忽略了一些东西。

您可以做两件事:

  1. selectKey()groupByKey()合并为groupBy()
  2. 您不需要 toStream().map() 这一步,但您可以直接在第一个 KTable
  3. 上使用新密钥重新分组

像这样:

stream.groupBy(/* put a KeyValueMapper that return the grouping key */)
      .aggregate(... TimeWindow.of(TimeUnit.HOURS.toMillis(1))
      .groupBy(/* put a KeyValueMapper that return the new grouping key */)
      .count()