End-of-window 与 KafkaStreams 的外部连接

End-of-window outer join with KafkaStreams

我有一个 Kafka 主题,我希望消息具有两种不同的密钥类型:旧的和新的。 即 "1-new""1-old""2-new""2-old"。密钥是唯一的,但有些可能会丢失。

现在使用 Kotlin 和 KafkaStreams API 我可以记录新旧密钥 ID 相同的消息。

    val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

现在我想知道 new/old 键由于某种原因在 时间 window 中丢失了。是否可以通过 KafkaStreams 实现 API?

在我的例子中,当收到密钥 "1-old" 并且 "1-new" 不在 2 分钟内时,仅在这种情况下我想报告 id 1 为可疑。

如果我对你的问题的理解正确,你只想在 2 分钟 window.[=11 内出现 "old" 而没有对应的 "new" 时将 id 报告为可疑=]

如果是这种情况,您需要使用左连接:

val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);

HTH

DSL 可能无法满足您的需求。但是,您可以使用处理器 API。话虽如此,leftJoin实际上可以用来做"heavy lifting"。因此,在 leftJoin 之后,您可以使用带有附加状态的 .transform(...) 进一步 "clean up" 数据。

对于您收到的每条 old&null 记录,将其放入商店。如果您稍后收到 old&new,您可以将其从商店中移除。此外,您注册了一个标点符号,并且在每次标点符号调用时,您扫描存储以查找 "old enough" 的条目,因此您可以确定以后不会产生 old&new 连接结果。对于这些条目,您发出 old&null 并将它们从商店中移除。

作为替代方案,您也可以省略连接,并在单个 transform() with state 中完成所有操作。为此,您需要 KStream#merge() 旧流和新流并在合并流上调用 transform()

注意:除了注册标点符号,您还可以将 "scan logic" 放入转换中,并在每次处理记录时执行它。

看起来像您要找的东西。 Kafka Streams left outer join on timeout

Eliminates the lack of sql-like left join semantic in kafka streams framework. This implementation will generate left join event only if full join event didn't happen in join window duration interval.