合并kafka流中的记录

merge records in a kafka stream

是否可以在 kafka 中合并记录并将输出发布到不同的流?

例如,有一个针对 kafka 主题的事件流,如下所示

{txnId:1,startTime:0900},{txnId:1,endTime:0905},{txnId:2,endTime:0912},{txnId:3,endTime:0930}, {txnId:2,startTime:0912},{txnId:3,startTime:0925}......

我想通过 txnId 合并这些事件并创建如下所示的合并输出

{txnId:1,startTime:0900,endTime:0905},{txnId:2,startTime:0910,endTime:0912},{txnId:3,startTime:0925,endTime:0930 }

请注意,如果在开始时间事件之前收到 txn Id 的 endTime,则在传入 events.So 中不会维护顺序,那么我们需要等到收到该 txnId 的开始时间事件,然后再启动合并

我浏览了 Kafka Streams 示例附带的字数统计示例,但不清楚如何等待事件然后在进行转换时合并。

非常感谢任何想法。

您可以尝试通过将开始事件和结束事件拆分为 2 个单独的流(以 txnId 为键)然后加入这两个流来解决此问题。

KStream<String, String> eventSource = new StreamBuilder().stream("INPUT-TOPIC");

KStream<String, JsonNode>[] splitEvents = 
          eventSource.map((key, eventString) -> {
                           JsonNode event = new ObjectMapper().readTree(eventString);
                           String txnId = event.path("txnId").asText();
                           return KeyValue.pair(txnId, event);
                        })
                     .branch((key, event) -> event.findValue("startTime") != null,
                             (key, event) -> event.findValue("endTime") != null); 


KStream<String, JsonNode> startEvents = splitEvents[0];
KStream<String, JsonNode> endEvents = splitEvents[1];

如图所示,两个流之间的连接在连接的任一侧都有事件时会产生一个连接结果。因此,这两个事件的顺序无关紧要(您必须确保为加入设置适当的 window 时间段)。

Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());

KStream<String, String> completeEvents = startEvents.join(endEvents, 
               (startEvent, endEvent) -> {
                    // Add logic to merge startEvent and endEvent as seen fit
                       ObjectNode completeEvent = JsonNodeFactory.instance.objectNode();
                       completeEvent.put("startTime",  startEvent.path("startTime).asText());
                       completeEvent.put("endTime",  endEvent.path("endTime").asText());
                       return completeEvent.toString();
                },
               JoinWindows.of(Duration.ofMinutes(15)),
               Joined.with(
                    Serdes.String(),   // key
                    jsonSerde,         // left object
                    jsonSerde          // right object
               )
          );