如何在 Kafka 流应用程序中统计固定时间 window 的唯一用户数?
How to count unique users in a fixed time window in a Kafka stream app?
对于用户在我们平台上执行的每个事件,我们在唯一主题中都有一条 kafka 消息。每个事件/kafka 消息都有一个公共字段 userId。我们现在想从该主题中了解我们每小时有多少唯一用户。所以我们对事件类型和用户的个人计数不感兴趣。我们只想知道每小时有多少唯一用户处于活动状态。
实现这一目标的最简单方法是什么?我现在的想法好像不是很简单,伪代码看这里:
stream
.selectKey() // userId
.groupByKey() // group by userid, results in a KGroupedStream[UserId, Value]
.aggregate( // initializer, merger und accumulator simply deliver a constant value, the message is now just a tick for that userId key
TimeWindows.of(3600000)
) // result of aggregate is KTable[Windowed[UserId], Const]
.toStream // convert in stream to be able to map key in next step
.map() // map key only (Windowed[Userid]) to key = startMs of window to and value Userid
.groupByKey() // grouping by startMs of windows, which was selected as key before
.count() // results in a KTable from startMs of window to counts of users (== unique userIds)
有没有更简单的方法?我可能忽略了一些东西。
您可以做两件事:
- 将
selectKey()
和groupByKey()
合并为groupBy()
- 您不需要
toStream().map()
这一步,但您可以直接在第一个 KTable
上使用新密钥重新分组
像这样:
stream.groupBy(/* put a KeyValueMapper that return the grouping key */)
.aggregate(... TimeWindow.of(TimeUnit.HOURS.toMillis(1))
.groupBy(/* put a KeyValueMapper that return the new grouping key */)
.count()
对于用户在我们平台上执行的每个事件,我们在唯一主题中都有一条 kafka 消息。每个事件/kafka 消息都有一个公共字段 userId。我们现在想从该主题中了解我们每小时有多少唯一用户。所以我们对事件类型和用户的个人计数不感兴趣。我们只想知道每小时有多少唯一用户处于活动状态。 实现这一目标的最简单方法是什么?我现在的想法好像不是很简单,伪代码看这里:
stream
.selectKey() // userId
.groupByKey() // group by userid, results in a KGroupedStream[UserId, Value]
.aggregate( // initializer, merger und accumulator simply deliver a constant value, the message is now just a tick for that userId key
TimeWindows.of(3600000)
) // result of aggregate is KTable[Windowed[UserId], Const]
.toStream // convert in stream to be able to map key in next step
.map() // map key only (Windowed[Userid]) to key = startMs of window to and value Userid
.groupByKey() // grouping by startMs of windows, which was selected as key before
.count() // results in a KTable from startMs of window to counts of users (== unique userIds)
有没有更简单的方法?我可能忽略了一些东西。
您可以做两件事:
- 将
selectKey()
和groupByKey()
合并为groupBy()
- 您不需要
toStream().map()
这一步,但您可以直接在第一个KTable
上使用新密钥重新分组
像这样:
stream.groupBy(/* put a KeyValueMapper that return the grouping key */)
.aggregate(... TimeWindow.of(TimeUnit.HOURS.toMillis(1))
.groupBy(/* put a KeyValueMapper that return the new grouping key */)
.count()