合并kafka流中的记录
merge records in a kafka stream
是否可以在 kafka 中合并记录并将输出发布到不同的流?
例如,有一个针对 kafka 主题的事件流,如下所示
{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930}, {txnId:2,startTime:0912},{txnId:3,startTime:0925}......
我想通过 txnId 合并这些事件并创建如下所示的合并输出
{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930 }
请注意,如果在开始时间事件之前收到 txn Id 的 endTime,则在传入 events.So 中不会维护顺序,那么我们需要等到收到该 txnId 的开始时间事件,然后再启动合并
我浏览了 Kafka Streams 示例附带的字数统计示例,但不清楚如何等待事件然后在进行转换时合并。
非常感谢任何想法。
您可以尝试通过将开始事件和结束事件拆分为 2 个单独的流(以 txnId 为键)然后加入这两个流来解决此问题。
KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");
KStream<String, JsonNode>[] splitEvents =
eventSource.map((key, eventString) -> {
JsonNode event = new ObjectMapper().readTree(eventString);
String txnId = event.path("txnId").asText();
return KeyValue.pair(txnId, event);
})
.branch((key, event) -> event.findValue("startTime") != null,
(key, event) -> event.findValue("endTime") != null);
KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];
如图所示,两个流之间的连接在连接的任一侧都有事件时会产生一个连接结果。因此,这两个事件的顺序无关紧要(您必须确保为加入设置适当的 window 时间段)。
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
KStream<String, String> completeEvents = startEvents.join(endEvents,
(startEvent, endEvent) -> {
// Add logic to merge startEvent and endEvent as seen fit
ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
completeEvent.put("startTime", startEvent.path("startTime).asText());
completeEvent.put("endTime", endEvent.path("endTime").asText());
return completeEvent.toString();
},
JoinWindows.of(Duration.ofMinutes(15)),
Joined.with(
Serdes.String(), // key
jsonSerde, // left object
jsonSerde // right object
)
);
是否可以在 kafka 中合并记录并将输出发布到不同的流?
例如,有一个针对 kafka 主题的事件流,如下所示
{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930}, {txnId:2,startTime:0912},{txnId:3,startTime:0925}......
我想通过 txnId 合并这些事件并创建如下所示的合并输出
{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930 }
请注意,如果在开始时间事件之前收到 txn Id 的 endTime,则在传入 events.So 中不会维护顺序,那么我们需要等到收到该 txnId 的开始时间事件,然后再启动合并
我浏览了 Kafka Streams 示例附带的字数统计示例,但不清楚如何等待事件然后在进行转换时合并。
非常感谢任何想法。
您可以尝试通过将开始事件和结束事件拆分为 2 个单独的流(以 txnId 为键)然后加入这两个流来解决此问题。
KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");
KStream<String, JsonNode>[] splitEvents =
eventSource.map((key, eventString) -> {
JsonNode event = new ObjectMapper().readTree(eventString);
String txnId = event.path("txnId").asText();
return KeyValue.pair(txnId, event);
})
.branch((key, event) -> event.findValue("startTime") != null,
(key, event) -> event.findValue("endTime") != null);
KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];
如图所示,两个流之间的连接在连接的任一侧都有事件时会产生一个连接结果。因此,这两个事件的顺序无关紧要(您必须确保为加入设置适当的 window 时间段)。
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
KStream<String, String> completeEvents = startEvents.join(endEvents,
(startEvent, endEvent) -> {
// Add logic to merge startEvent and endEvent as seen fit
ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
completeEvent.put("startTime", startEvent.path("startTime).asText());
completeEvent.put("endTime", endEvent.path("endTime").asText());
return completeEvent.toString();
},
JoinWindows.of(Duration.ofMinutes(15)),
Joined.with(
Serdes.String(), // key
jsonSerde, // left object
jsonSerde // right object
)
);