KTable Reduce 函数不支持窗口化
KTable Reduce function does not honor windowing
需求:- 我们需要合并所有具有相同orderid的消息,并对合并后的消息进行后续操作。
解释:- 下面的代码片段试图捕获从特定租户收到的所有订单消息,并在等待特定时间段后尝试合并为单个订单消息
它执行以下操作
- 基于 OrderId 的重新分区消息。所以每个订单消息都将以 tenantId 和 groupId 作为它的键
- 执行 groupby 键操作,然后 windowed 操作 2 分钟
- windowing 完成后执行归约操作。
- Ktable 再次转换为流回,然后将其输出发送到另一个 kafka 主题
预期输出:- 如果在 window 期间发送了 5 条具有相同订单 ID 的消息。预计最终的kafka主题应该只有一条消息,并且是最后一条reduce操作消息。
实际输出:- 显示所有 5 条消息,表明 windowing 在调用 reduce 操作之前没有发生。在收到每条消息时,kafka 中看到的所有消息都有适当的 reduce 操作。
查询:- 在 kafka 流库版本 0.11.0.0 中,reduce 函数用于接受时间 window 作为其参数。我看到这在 kafka 流版本 1.0.0 中已被弃用。在下面的代码中完成的窗口化是否正确?较新版本的 kafka 流库 1.0.0 是否支持 windowing ?如果是这样,那么下面的代码片段是否可以改进?
String orderMsgTopic = "sampleordertopic";
JsonSerializer<OrderMsg> orderMsgJSONSerialiser = new JsonSerializer<>();
JsonDeserializer<OrderMsg> orderMsgJSONDeSerialiser = new JsonDeserializer<>(OrderMsg.class);
Serde<OrderMsg> orderMsgSerde = Serdes.serdeFrom(orderMsgJSONSerialiser,orderMsgJSONDeSerialiser);
KStream<String, OrderMsg> orderMsgStream = this.builder.stream(orderMsgTopic, Consumed.with(Serdes.ByteArray(), orderMsgSerde))
.map(new KeyValueMapper<byte[], OrderMsg, KeyValue<? extends String, ? extends OrderMsg>>() {
@Override
public KeyValue<? extends String, ? extends OrderMsg> apply(byte[] byteArr, OrderMsg value) {
TenantIdMessageTypeDeserializer deserializer = new TenantIdMessageTypeDeserializer();
TenantIdMessageType tenantIdMessageType = deserializer.deserialize(orderMsgTopic, byteArr);
String newTenantOrderKey = null;
if ((tenantIdMessageType != null) && (tenantIdMessageType.getMessageType() == 1)) {
Long tenantId = tenantIdMessageType.getTenantId();
newTenantOrderKey = tenantId.toString() + value.getOrderKey();
} else {
newTenantOrderKey = value.getOrderKey();
}
return new KeyValue<String, OrderMsg>(newTenantOrderKey, value);
}
});
final KTable<Windowed<String>, OrderMsg> orderGrouping = orderMsgStream.groupByKey(Serialized.with(Serdes.String(), orderMsgSerde))
.windowedBy(TimeWindows.of(windowTime).advanceBy(windowTime))
.reduce(new OrderMsgReducer());
orderGrouping.toStream().map(new KeyValueMapper<Windowed<String>, OrderMsg, KeyValue<String, OrderMsg>>() {
@Override
public KeyValue<String, OrderMsg> apply(Windowed<String> key, OrderMsg value) {
return new KeyValue<String, OrderMsg>(key.key(), value);
}
}).to("newone11", Produced.with(Serdes.String(), orderMsgSerde));
我意识到我已经将 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0,并且还设置了 1000 毫秒的默认提交间隔。更改此值在某种程度上帮助我使窗口正常工作
需求:- 我们需要合并所有具有相同orderid的消息,并对合并后的消息进行后续操作。
解释:- 下面的代码片段试图捕获从特定租户收到的所有订单消息,并在等待特定时间段后尝试合并为单个订单消息 它执行以下操作
- 基于 OrderId 的重新分区消息。所以每个订单消息都将以 tenantId 和 groupId 作为它的键
- 执行 groupby 键操作,然后 windowed 操作 2 分钟
- windowing 完成后执行归约操作。
- Ktable 再次转换为流回,然后将其输出发送到另一个 kafka 主题
预期输出:- 如果在 window 期间发送了 5 条具有相同订单 ID 的消息。预计最终的kafka主题应该只有一条消息,并且是最后一条reduce操作消息。
实际输出:- 显示所有 5 条消息,表明 windowing 在调用 reduce 操作之前没有发生。在收到每条消息时,kafka 中看到的所有消息都有适当的 reduce 操作。
查询:- 在 kafka 流库版本 0.11.0.0 中,reduce 函数用于接受时间 window 作为其参数。我看到这在 kafka 流版本 1.0.0 中已被弃用。在下面的代码中完成的窗口化是否正确?较新版本的 kafka 流库 1.0.0 是否支持 windowing ?如果是这样,那么下面的代码片段是否可以改进?
String orderMsgTopic = "sampleordertopic";
JsonSerializer<OrderMsg> orderMsgJSONSerialiser = new JsonSerializer<>();
JsonDeserializer<OrderMsg> orderMsgJSONDeSerialiser = new JsonDeserializer<>(OrderMsg.class);
Serde<OrderMsg> orderMsgSerde = Serdes.serdeFrom(orderMsgJSONSerialiser,orderMsgJSONDeSerialiser);
KStream<String, OrderMsg> orderMsgStream = this.builder.stream(orderMsgTopic, Consumed.with(Serdes.ByteArray(), orderMsgSerde))
.map(new KeyValueMapper<byte[], OrderMsg, KeyValue<? extends String, ? extends OrderMsg>>() {
@Override
public KeyValue<? extends String, ? extends OrderMsg> apply(byte[] byteArr, OrderMsg value) {
TenantIdMessageTypeDeserializer deserializer = new TenantIdMessageTypeDeserializer();
TenantIdMessageType tenantIdMessageType = deserializer.deserialize(orderMsgTopic, byteArr);
String newTenantOrderKey = null;
if ((tenantIdMessageType != null) && (tenantIdMessageType.getMessageType() == 1)) {
Long tenantId = tenantIdMessageType.getTenantId();
newTenantOrderKey = tenantId.toString() + value.getOrderKey();
} else {
newTenantOrderKey = value.getOrderKey();
}
return new KeyValue<String, OrderMsg>(newTenantOrderKey, value);
}
});
final KTable<Windowed<String>, OrderMsg> orderGrouping = orderMsgStream.groupByKey(Serialized.with(Serdes.String(), orderMsgSerde))
.windowedBy(TimeWindows.of(windowTime).advanceBy(windowTime))
.reduce(new OrderMsgReducer());
orderGrouping.toStream().map(new KeyValueMapper<Windowed<String>, OrderMsg, KeyValue<String, OrderMsg>>() {
@Override
public KeyValue<String, OrderMsg> apply(Windowed<String> key, OrderMsg value) {
return new KeyValue<String, OrderMsg>(key.key(), value);
}
}).to("newone11", Produced.with(Serdes.String(), orderMsgSerde));
我意识到我已经将 StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG 设置为 0,并且还设置了 1000 毫秒的默认提交间隔。更改此值在某种程度上帮助我使窗口正常工作