使用 Kafka Streams windowing 数据并一次处理每个 window
Use Kafka Streams for windowing data and processing each window at once
我想要实现的目的是按用户对我从 Kafka 主题收到的一些消息进行分组,window 它们以便汇总我在(5 分钟)window 内收到的消息.然后我想收集每个 window 中的所有聚合,以便立即处理它们,将它们添加到我在 5 分钟间隔内收到的所有消息的报告中。
最后一点似乎是困难的部分,因为 Kafka Streams 似乎没有提供(至少我找不到!)任何可以收集所有 window 相关内容的东西 "finite" 流在一处处理。
这是我实现的代码
StreamsBuilder builder = new StreamsBuilder();
KStream<UserId, Message> messages = builder.stream("KAFKA_TOPIC");
TimeWindowedKStream<UserId, Message> windowedMessages =
messages.
groupByKey().windowedBy(TimeWindows.of(SIZE_MS));
KTable<Windowed<UserId>, List<Message>> messagesAggregatedByWindow =
windowedMessages.
aggregate(
() -> new LinkedList<>(), new MyAggregator<>(),
Materialized.with(new MessageKeySerde(), new MessageListSerde())
);
messagesAggregatedByWindow.toStream().foreach((key, value) -> log.info("({}), KEY {} MESSAGE {}", value.size(), key, value.toString()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
结果类似于
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
每个 window 都有很多日志行,它们与其他行混合 windows。
我想要的是:
// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval, window));
其中方法过程类似于:
// Hypothetical implementation
void process(Interval interval, WindowStream<UserId, List<Message>> windowStream) {
// Create report for the whole window
Report report = new Report(nameFromInterval());
// Loop on the finite iterable that represents the window content
for (WindowStreamEntry<UserId, List<Message>> entry: windowStream) {
report.addLine(entry.getKey(), entry.getValue());
}
report.close();
}
结果将像这样分组(每个报告都是对我的回调的调用:void process(...))并且每个 window 的提交将在整个 window 处理:
Report 1:
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
Report 2:
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
Report 3
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
我也有同样的疑问。我已经与库的开发人员交谈过,他们说这是一个非常普遍的要求,但尚未实现。即将发布。
您可以在此处找到更多信息:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
我想要实现的目的是按用户对我从 Kafka 主题收到的一些消息进行分组,window 它们以便汇总我在(5 分钟)window 内收到的消息.然后我想收集每个 window 中的所有聚合,以便立即处理它们,将它们添加到我在 5 分钟间隔内收到的所有消息的报告中。
最后一点似乎是困难的部分,因为 Kafka Streams 似乎没有提供(至少我找不到!)任何可以收集所有 window 相关内容的东西 "finite" 流在一处处理。
这是我实现的代码
StreamsBuilder builder = new StreamsBuilder();
KStream<UserId, Message> messages = builder.stream("KAFKA_TOPIC");
TimeWindowedKStream<UserId, Message> windowedMessages =
messages.
groupByKey().windowedBy(TimeWindows.of(SIZE_MS));
KTable<Windowed<UserId>, List<Message>> messagesAggregatedByWindow =
windowedMessages.
aggregate(
() -> new LinkedList<>(), new MyAggregator<>(),
Materialized.with(new MessageKeySerde(), new MessageListSerde())
);
messagesAggregatedByWindow.toStream().foreach((key, value) -> log.info("({}), KEY {} MESSAGE {}", value.size(), key, value.toString()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
结果类似于
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
每个 window 都有很多日志行,它们与其他行混合 windows。
我想要的是:
// Hypothetical implementation
windowedMessages.streamWindows((interval, window) -> process(interval, window));
其中方法过程类似于:
// Hypothetical implementation
void process(Interval interval, WindowStream<UserId, List<Message>> windowStream) {
// Create report for the whole window
Report report = new Report(nameFromInterval());
// Loop on the finite iterable that represents the window content
for (WindowStreamEntry<UserId, List<Message>> entry: windowStream) {
report.addLine(entry.getKey(), entry.getValue());
}
report.close();
}
结果将像这样分组(每个报告都是对我的回调的调用:void process(...))并且每个 window 的提交将在整个 window 处理:
Report 1:
KEY [UserId(85077691)@1531502750000/1531502760000] Message [Message(userId=UserId(85077691),message="h")]
Report 2:
KEY [UserId(82770583)@1531502760000/1531502770000] Message [Message(userId=UserId(82770583),message="a"),Message(userId=UserId(82770583),message="b"),Message(userId=UserId(82770583),message="d")]
KEY [UserId(77082590)@1531502760000/1531502770000] Message [Message(userId=UserId(77082590),message="g")]
KEY [UserId(73176289)@1531502760000/1531502770000] Message [Message(userId=UserId(73176289),message="r"),Message(userId=UserId(73176289),message="q")]
KEY [UserId(92077080)@1531502760000/1531502770000] Message [Message(userId=UserId(92077080),message="w")]
KEY [UserId(78530050)@1531502760000/1531502770000] Message [Message(userId=UserId(78530050),message="t")]
KEY [UserId(64640536)@1531502760000/1531502770000] Message [Message(userId=UserId(64640536),message="y")]
Report 3
KEY [UserId(79117307)@1531502780000/1531502790000] Message [Message(userId=UserId(79117307),message="e")]
我也有同样的疑问。我已经与库的开发人员交谈过,他们说这是一个非常普遍的要求,但尚未实现。即将发布。
您可以在此处找到更多信息: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables