Kafka 流 - 根据消息组设置不同的时间 window
Kafka streams - set a different time window depending on the message group
我想知道如果给定一个 KStream,是否可以根据消息组设置不同的时间 window,例如,对于 groupBy "A" 5 秒,对于 groupBy "B" 10 秒 ...
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...
想到的最简单的方法是在 .groupBy()
/.aggregate()
之前 .filter()
或 .branch()
,例如:
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.filter((key, msg) -> msg.getPool().equals("A"))
.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...
我想知道如果给定一个 KStream,是否可以根据消息组设置不同的时间 window,例如,对于 groupBy "A" 5 秒,对于 groupBy "B" 10 秒 ...
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...
想到的最简单的方法是在 .groupBy()
/.aggregate()
之前 .filter()
或 .branch()
,例如:
KStream<String, Msg> stream = builder.stream(stringSerde, msgSerde, input);
stream.filter((key, msg) -> msg.getPool().equals("A"))
.groupBy((key, msg) -> msg.getPool())
.aggregate(init, agg, TimeWindows.of(wndLength).advanceBy(wndLength), msgSerde)
...