来自不同 Kafka 主题的事件数量的聚合(求和)
Aggregation (summation) of number of events from different Kafka topics
我的应用程序有三个主题接收一些属于用户的事件:
Event Type A -> Topic A
Event Type B -> Topic B
Event Type C -> Topic C
这将是消息流的示例:
Message(user 1 - event A - 2020-01-03)
Message(user 2 - event A - 2020-01-03)
Message(user 1 - event C - 2020-01-20)
Message(user 1 - event B - 2020-01-22)
我希望能够生成包含每个用户每月事件总数的报告,汇总来自三个主题的所有事件,例如:
User 1 - 2020-01 -> 3 total events
User 2 - 2020-01 -> 1 total events
拥有三个 KStreams(每个主题一个),我如何每月执行此加法以获得来自三个不同主题的所有事件的总和?你能显示这个代码吗?
因为您只对计数感兴趣,所以最简单的方法是只保留用户 ID 作为键,并为每个 KStream
保留一些虚拟值,合并所有三个流并执行 window之后计算(请注意,不支持基于日历的 windows 开箱即用;您可以使用 31 天 window 作为近似值或构建您自己的自定义 windows):
// just map to dummy empty string (note, that `null` would not work
KStream<UserId, String> streamA = builder.stream("topic-A").mapValues(v -> "");
KStream<UserId, String> streamB = builder.stream("topic-B").mapValues(v -> "");
KStream<UserId, String> streamC = builder.stream("topic-C").mapValues(v -> "");
streamA.merge(streamB).merge(streamC).groupByKey().windowBy(...).count();
您可能还对 suppress()
运算符感兴趣。
我的应用程序有三个主题接收一些属于用户的事件:
Event Type A -> Topic A
Event Type B -> Topic B
Event Type C -> Topic C
这将是消息流的示例:
Message(user 1 - event A - 2020-01-03)
Message(user 2 - event A - 2020-01-03)
Message(user 1 - event C - 2020-01-20)
Message(user 1 - event B - 2020-01-22)
我希望能够生成包含每个用户每月事件总数的报告,汇总来自三个主题的所有事件,例如:
User 1 - 2020-01 -> 3 total events
User 2 - 2020-01 -> 1 total events
拥有三个 KStreams(每个主题一个),我如何每月执行此加法以获得来自三个不同主题的所有事件的总和?你能显示这个代码吗?
因为您只对计数感兴趣,所以最简单的方法是只保留用户 ID 作为键,并为每个 KStream
保留一些虚拟值,合并所有三个流并执行 window之后计算(请注意,不支持基于日历的 windows 开箱即用;您可以使用 31 天 window 作为近似值或构建您自己的自定义 windows):
// just map to dummy empty string (note, that `null` would not work
KStream<UserId, String> streamA = builder.stream("topic-A").mapValues(v -> "");
KStream<UserId, String> streamB = builder.stream("topic-B").mapValues(v -> "");
KStream<UserId, String> streamC = builder.stream("topic-C").mapValues(v -> "");
streamA.merge(streamB).merge(streamC).groupByKey().windowBy(...).count();
您可能还对 suppress()
运算符感兴趣。