如何从kafka流中获取窗口聚合?
How to get windowed aggregation from kafka stream?
我有一系列事件,我想根据时间 windows 进行汇总。我的解决方案提供增量聚合,而不是按时间 window 提供聚合。我读到这对于流来说是正常的,因为它会将结果作为更改日志给出。同样在研究过程中,我遇到了 and 。但是第一个 post 中的解决方案有些过时(使用已弃用的 API)。我使用了新的 API,这是在那些已弃用的 API 中建议的。这是我的解决方案,
KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
String groupBy = getGroupBy(value, criteria);
return groupBy;
}, Serialized.with(Serdes.String(), eventSerde));
long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
.windowedBy(TimeWindows.of(windowSizeMs)
.advanceBy(windowSizeMs));
但是正如我之前解释的那样,我的结果不是在特定时间给出的 windows,而是作为增量聚合给出的。我需要我的数据在 windowSize 中指定的时间输出。我还读到 CACHE_MAX_BYTES_BUFFERING_CONFIG
可以控制输出,但我需要适用于每种情况的可靠解决方案。另请注意,https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows wiki 中给出的模式现已过时,因为它使用旧的 APIs。 (我使用的是kafka-streams 1.1.0版本)
问题是我的错误。以上,代码示例工作正常。但最后我将 KTable
转换为 KStream
。这就是问题所在。转换为 KStream
也会导致输出中间结果。 https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 中给出的模式工作正常。通过有问题的代码,
// Aggregation
KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());
// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
.mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);
// output KTable to sample topic. But this output controlled by
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters.
// I'm using default values for these params.
results.to(windowedSerde, eventSerde, "Sample");
我有一系列事件,我想根据时间 windows 进行汇总。我的解决方案提供增量聚合,而不是按时间 window 提供聚合。我读到这对于流来说是正常的,因为它会将结果作为更改日志给出。同样在研究过程中,我遇到了
KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
String groupBy = getGroupBy(value, criteria);
return groupBy;
}, Serialized.with(Serdes.String(), eventSerde));
long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
.windowedBy(TimeWindows.of(windowSizeMs)
.advanceBy(windowSizeMs));
但是正如我之前解释的那样,我的结果不是在特定时间给出的 windows,而是作为增量聚合给出的。我需要我的数据在 windowSize 中指定的时间输出。我还读到 CACHE_MAX_BYTES_BUFFERING_CONFIG
可以控制输出,但我需要适用于每种情况的可靠解决方案。另请注意,https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows wiki 中给出的模式现已过时,因为它使用旧的 APIs。 (我使用的是kafka-streams 1.1.0版本)
问题是我的错误。以上,代码示例工作正常。但最后我将 KTable
转换为 KStream
。这就是问题所在。转换为 KStream
也会导致输出中间结果。 https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 中给出的模式工作正常。通过有问题的代码,
// Aggregation
KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());
// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
.mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);
// output KTable to sample topic. But this output controlled by
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters.
// I'm using default values for these params.
results.to(windowedSerde, eventSerde, "Sample");