如何在卡夫卡流中关闭 window 时发送有关主题的记录
How to send record on topic when window is closed in kafka streams
事实上,我已经为此苦苦挣扎了几天。我正在使用 4 个主题的记录。我需要通过 TimedWindow 聚合记录。时间到了,我想向接收器主题发送一条已批准的消息或未批准的消息。这可能与 kafka 流有关吗?
它似乎将每条记录都下沉到新主题,即使 window 仍然开放,这真的不是我想要的。
这是简单的代码:
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))
.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));
截至目前,每条记录都被下沉到 outgoingTopic,我只希望它在 window 关闭时发送一条消息,可以这么说。
这可能吗?
我回答我自己的问题,如果其他人需要答案。在转换阶段,我使用上下文来创建调度程序。该调度程序采用三个参数。标点的时间间隔,使用的时间(挂钟或流时间)和供应商(满足时间时调用的方法)。我使用挂钟时间并为每个唯一的 window 键启动一个新的调度程序。我将每条消息添加到 KeyValue 存储中,并且 return 为空。然后,在每 30 秒调用一次的方法中,我检查 window 是否已关闭,并遍历密钥库中的消息,聚合并使用 context.forward 和 context.commit。中提琴! 30 秒内收到 4 条消息 window,生成一条消息。
您可以使用抑制功能。
来自Kafka官方指南:
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-results
我遇到了这个问题,但我解决了这个问题,在固定 window 之后添加 grace(0) 并使用 Suppressed API
public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {
buildAggregateMetricsBySensor(stream)
.to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));
}
private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
return stream
.map((key, val) -> new KeyValue<>(val.getId(), val))
.groupByKey(Grouped.with(String(), new SensorDataSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
.aggregate(SensorAggregateMetricsDTO::new,
(String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
buildWindowPersistentStore())
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value));
}
private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
return Materialized
.<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
.withKeySerde(String())
.withValueSerde(new SensorAggregateMetricsSerde());
}
这里可以看到结果
事实上,我已经为此苦苦挣扎了几天。我正在使用 4 个主题的记录。我需要通过 TimedWindow 聚合记录。时间到了,我想向接收器主题发送一条已批准的消息或未批准的消息。这可能与 kafka 流有关吗?
它似乎将每条记录都下沉到新主题,即使 window 仍然开放,这真的不是我想要的。
这是简单的代码:
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String,
FooTriggerMessage>("", Serdes.String(),
fooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) -> value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), fooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30))
.advanceBy(TimeUnit.SECONDS.toMillis(30)))
.aggregate(() -> new BarApprovalMessage(), /* initializer */
(key, value, aggValue) -> getApproval(key, value, aggValue),/*adder*/
Materialized
.<String, BarApprovalMessage, WindowStore<Bytes, byte[]>>as(
storeName) /* state store name */
.withValueSerde(barApprovalSerde))
.toStream().to(appProperties.getBarApprovalEngineOutgoing(),
Produced.with(windowedSerde, barApprovalSerde));
截至目前,每条记录都被下沉到 outgoingTopic,我只希望它在 window 关闭时发送一条消息,可以这么说。
这可能吗?
我回答我自己的问题,如果其他人需要答案。在转换阶段,我使用上下文来创建调度程序。该调度程序采用三个参数。标点的时间间隔,使用的时间(挂钟或流时间)和供应商(满足时间时调用的方法)。我使用挂钟时间并为每个唯一的 window 键启动一个新的调度程序。我将每条消息添加到 KeyValue 存储中,并且 return 为空。然后,在每 30 秒调用一次的方法中,我检查 window 是否已关闭,并遍历密钥库中的消息,聚合并使用 context.forward 和 context.commit。中提琴! 30 秒内收到 4 条消息 window,生成一条消息。
您可以使用抑制功能。
来自Kafka官方指南:
我遇到了这个问题,但我解决了这个问题,在固定 window 之后添加 grace(0) 并使用 Suppressed API
public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) {
buildAggregateMetricsBySensor(stream)
.to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));
}
private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) {
return stream
.map((key, val) -> new KeyValue<>(val.getId(), val))
.groupByKey(Grouped.with(String(), new SensorDataSerde()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
.aggregate(SensorAggregateMetricsDTO::new,
(String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
buildWindowPersistentStore())
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((key, value) -> KeyValue.pair(key.key(), value));
}
private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() {
return Materialized
.<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
.withKeySerde(String())
.withValueSerde(new SensorAggregateMetricsSerde());
}
这里可以看到结果