如何发送时间窗 KTable 的最终 kafka-streams 聚合结果?
How to send final kafka-streams aggregation result of a time windowed KTable?
我想做的是:
- 从数字主题(龙的)消费记录
- 聚合(计数)每 5 秒的值window
- 将最终聚合结果发送到另一个主题
我的代码如下所示:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");
看起来一切都按预期进行,但聚合已发送到每个传入记录的目标主题。我的问题是如何只发送每个 window?
的最终聚合结果
在 Kafka Streams 中没有 "final aggregation" 这样的东西。 Windows 一直保持打开状态,以处理在 window 结束时间过后到达的无序记录。但是,windows 不会永远保留。一旦保留时间到期,它们就会被丢弃。没有关于何时丢弃 window 的特殊操作。
有关详细信息,请参阅 Confluent 文档:http://docs.confluent.io/current/streams/
因此,对于聚合的每次更新,都会产生一个结果记录(因为 Kafka Streams 也会在乱序记录上更新聚合结果)。您的 "final result" 将是最新的结果记录(在 window 被丢弃之前)。根据您的用例,手动重复数据删除将是解决问题的一种方法(使用较低的杠杆 API、transform()
或 process()
)
此博客 post 也可能有帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html
另一个博客 post 在不使用标点符号的情况下解决了这个问题:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html
更新
在 KIP-328 中,添加了一个 KTable#suppress()
运算符,这将允许以严格的方式抑制连续更新并为每个 window 发出单个结果记录;权衡是增加延迟。
从 Kafka Streams 版本 2.1 开始,您可以实现此 using suppress
.
上述 Apache Kafka Streams 文档中有一个示例,当用户在一小时内发生的事件少于三个时,它会发送警报:
KGroupedStream<UserId, Event> grouped = ...;
grouped
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
如 answer, you should be aware of the tradeoff. Moreover, 的更新所述,suppress() 基于事件时间。
我遇到了这个问题,但我解决了这个问题,在固定 window 之后添加 grace(0) 并使用 Suppressed API
public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {
buildAggregateMetricsBySensor(stream)
.to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));
}
private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
return stream
.map((key, val) -> new KeyValue<>(val.getId(), val))
.groupByKey(Grouped.with(String(), new SensorDataSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
.aggregate(SensorAggregateMetricsDTO::new,
(String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
buildWindowPersistentStore())
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value));
}
private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
return Materialized
.<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
.withKeySerde(String())
.withValueSerde(new SensorAggregateMetricsSerde());
}
这里可以看到结果
我想做的是:
- 从数字主题(龙的)消费记录
- 聚合(计数)每 5 秒的值window
- 将最终聚合结果发送到另一个主题
我的代码如下所示:
KStream<String, Long> longs = builder.stream(
Serdes.String(), Serdes.Long(), "longs");
// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts =
longs.countByKey(TimeWindows.of("longCounts", 5000L));
// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
.to("long-counts");
看起来一切都按预期进行,但聚合已发送到每个传入记录的目标主题。我的问题是如何只发送每个 window?
的最终聚合结果在 Kafka Streams 中没有 "final aggregation" 这样的东西。 Windows 一直保持打开状态,以处理在 window 结束时间过后到达的无序记录。但是,windows 不会永远保留。一旦保留时间到期,它们就会被丢弃。没有关于何时丢弃 window 的特殊操作。
有关详细信息,请参阅 Confluent 文档:http://docs.confluent.io/current/streams/
因此,对于聚合的每次更新,都会产生一个结果记录(因为 Kafka Streams 也会在乱序记录上更新聚合结果)。您的 "final result" 将是最新的结果记录(在 window 被丢弃之前)。根据您的用例,手动重复数据删除将是解决问题的一种方法(使用较低的杠杆 API、transform()
或 process()
)
此博客 post 也可能有帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html
另一个博客 post 在不使用标点符号的情况下解决了这个问题:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html
更新
在 KIP-328 中,添加了一个 KTable#suppress()
运算符,这将允许以严格的方式抑制连续更新并为每个 window 发出单个结果记录;权衡是增加延迟。
从 Kafka Streams 版本 2.1 开始,您可以实现此 using suppress
.
上述 Apache Kafka Streams 文档中有一个示例,当用户在一小时内发生的事件少于三个时,它会发送警报:
KGroupedStream<UserId, Event> grouped = ...;
grouped
.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 3)
.toStream()
.foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
如
我遇到了这个问题,但我解决了这个问题,在固定 window 之后添加 grace(0) 并使用 Suppressed API
public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {
buildAggregateMetricsBySensor(stream)
.to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));
}
private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
return stream
.map((key, val) -> new KeyValue<>(val.getId(), val))
.groupByKey(Grouped.with(String(), new SensorDataSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
.aggregate(SensorAggregateMetricsDTO::new,
(String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
buildWindowPersistentStore())
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value));
}
private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
return Materialized
.<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
.withKeySerde(String())
.withValueSerde(new SensorAggregateMetricsSerde());
}
这里可以看到结果