End-of-window 与 KafkaStreams 的外部连接
End-of-window outer join with KafkaStreams
我有一个 Kafka 主题,我希望消息具有两种不同的密钥类型:旧的和新的。
即 "1-new"
、"1-old"
、"2-new"
、"2-old"
。密钥是唯一的,但有些可能会丢失。
现在使用 Kotlin 和 KafkaStreams API 我可以记录新旧密钥 ID 相同的消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
现在我想知道 new/old 键由于某种原因在 时间 window 中丢失了。是否可以通过 KafkaStreams 实现 API?
在我的例子中,当收到密钥 "1-old"
并且 "1-new"
不在 2 分钟内时,仅在这种情况下我想报告 id 1
为可疑。
如果我对你的问题的理解正确,你只想在 2 分钟 window.[=11 内出现 "old" 而没有对应的 "new" 时将 id 报告为可疑=]
如果是这种情况,您需要使用左连接:
val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);
HTH
DSL 可能无法满足您的需求。但是,您可以使用处理器 API。话虽如此,leftJoin
实际上可以用来做"heavy lifting"。因此,在 leftJoin
之后,您可以使用带有附加状态的 .transform(...)
进一步 "clean up" 数据。
对于您收到的每条 old&null
记录,将其放入商店。如果您稍后收到 old&new
,您可以将其从商店中移除。此外,您注册了一个标点符号,并且在每次标点符号调用时,您扫描存储以查找 "old enough" 的条目,因此您可以确定以后不会产生 old&new
连接结果。对于这些条目,您发出 old&null
并将它们从商店中移除。
作为替代方案,您也可以省略连接,并在单个 transform()
with state 中完成所有操作。为此,您需要 KStream#merge()
旧流和新流并在合并流上调用 transform()
。
注意:除了注册标点符号,您还可以将 "scan logic" 放入转换中,并在每次处理记录时执行它。
看起来像您要找的东西。 Kafka Streams left outer join on timeout
Eliminates the lack of sql-like left join semantic in kafka streams framework. This implementation will generate left join event only if full join event didn't happen in join window duration interval.
我有一个 Kafka 主题,我希望消息具有两种不同的密钥类型:旧的和新的。
即 "1-new"
、"1-old"
、"2-new"
、"2-old"
。密钥是唯一的,但有些可能会丢失。
现在使用 Kotlin 和 KafkaStreams API 我可以记录新旧密钥 ID 相同的消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
现在我想知道 new/old 键由于某种原因在 时间 window 中丢失了。是否可以通过 KafkaStreams 实现 API?
在我的例子中,当收到密钥 "1-old"
并且 "1-new"
不在 2 分钟内时,仅在这种情况下我想报告 id 1
为可疑。
如果我对你的问题的理解正确,你只想在 2 分钟 window.[=11 内出现 "old" 而没有对应的 "new" 时将 id 报告为可疑=]
如果是这种情况,您需要使用左连接:
val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);
HTH
DSL 可能无法满足您的需求。但是,您可以使用处理器 API。话虽如此,leftJoin
实际上可以用来做"heavy lifting"。因此,在 leftJoin
之后,您可以使用带有附加状态的 .transform(...)
进一步 "clean up" 数据。
对于您收到的每条 old&null
记录,将其放入商店。如果您稍后收到 old&new
,您可以将其从商店中移除。此外,您注册了一个标点符号,并且在每次标点符号调用时,您扫描存储以查找 "old enough" 的条目,因此您可以确定以后不会产生 old&new
连接结果。对于这些条目,您发出 old&null
并将它们从商店中移除。
作为替代方案,您也可以省略连接,并在单个 transform()
with state 中完成所有操作。为此,您需要 KStream#merge()
旧流和新流并在合并流上调用 transform()
。
注意:除了注册标点符号,您还可以将 "scan logic" 放入转换中,并在每次处理记录时执行它。
看起来像您要找的东西。 Kafka Streams left outer join on timeout
Eliminates the lack of sql-like left join semantic in kafka streams framework. This implementation will generate left join event only if full join event didn't happen in join window duration interval.