Kafka Streams 的时间 window 如何或何时过期?
How or When is time window of Kafka Streams expired?
我在我的应用程序中使用 kafka 流,我对聚合函数中的时间 window 有疑问。
KTable<Windowed<String>, PredictReq> windowedKtable = views.map(new ValueMapper()).groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.aggregate(new ADInitializer(), new ADAggregator(),Materialized.with(Serdes.String(), ReqJsonSerde));
KStream<Windowed<String>, Req> filtered = windowedKtable.toStream().transform(new ADTransformerFilter());
KStream<String, String> result = filtered.transform(new ADTransformerTrans());
我在1分钟内汇总了数据window,然后转换得到最终的聚合结果并进行第二次转换。
这是一些示例数据:
msg1: 10:00:00 来了,msg2: 10:00:20 来了,msg3: 10:01:10 来了
例如,window 从 10:00:00 开始到 10:01:00。
我发现 windows 没有过期,直到 msg3 来了! (因为后面的transform直到msg3来了才执行。)
这不是我想要的。
我的测试有问题吗?如果这是事实,如何改变?
我明白了...
Kafka 流没有 window expired
的概念。所以我在消息中使用 window 来检查 window 是否已更改,因此我必须等待下一个 window.
的消息
如果没有收到下一条消息,我不知道 window 已经完成了。
我在我的应用程序中使用 kafka 流,我对聚合函数中的时间 window 有疑问。
KTable<Windowed<String>, PredictReq> windowedKtable = views.map(new ValueMapper()).groupByKey().windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.aggregate(new ADInitializer(), new ADAggregator(),Materialized.with(Serdes.String(), ReqJsonSerde));
KStream<Windowed<String>, Req> filtered = windowedKtable.toStream().transform(new ADTransformerFilter());
KStream<String, String> result = filtered.transform(new ADTransformerTrans());
我在1分钟内汇总了数据window,然后转换得到最终的聚合结果并进行第二次转换。
这是一些示例数据:
msg1: 10:00:00 来了,msg2: 10:00:20 来了,msg3: 10:01:10 来了
例如,window 从 10:00:00 开始到 10:01:00。
我发现 windows 没有过期,直到 msg3 来了! (因为后面的transform直到msg3来了才执行。)
这不是我想要的。
我的测试有问题吗?如果这是事实,如何改变?
我明白了...
Kafka 流没有 window expired
的概念。所以我在消息中使用 window 来检查 window 是否已更改,因此我必须等待下一个 window.
如果没有收到下一条消息,我不知道 window 已经完成了。