使用 kafka 流在 window 时间内获取给定键的最后一个事件
Getting the last event of a given key in a time window with kafka stream
我开始使用 KStream 来使用现有主题中的数据。
我只对在 10 秒内获取给定 ID 的最后一个事件感兴趣 window。我尝试使用以下代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
但我最终得到了所有事件,而不仅仅是最后一个。是否可以使用 KStream 做我想做的事?
使用.suppress()
它抑制了 window 的所有中间结果,只发出最终结果。
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(unbounded()))) // like this
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
您可以在此处阅读更多内容:https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
我开始使用 KStream 来使用现有主题中的数据。
我只对在 10 秒内获取给定 ID 的最后一个事件感兴趣 window。我尝试使用以下代码:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
但我最终得到了所有事件,而不仅仅是最后一个。是否可以使用 KStream 做我想做的事?
使用.suppress()
它抑制了 window 的所有中间结果,只发出最终结果。
stream.selectKey((key, value) -> value.getID())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(unbounded()))) // like this
.toStream()
.peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));
您可以在此处阅读更多内容:https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results