Kafka Streams DSL 重试逻辑(写记录回主题)
Kafka Streams DSL retry logic (writing record back to topic)
在执行 KStream - KTable 左联接时,如果右 table 不匹配,我想将记录发送回同一主题,X 秒后。
这可以通过 DSL 实现吗?
我猜你可以
KStream[] streams = stream.leftJoin(table,...).branch(...);
stream[1].transform(...).to("input-topic");
您使用分支将连接的记录放入第一个流,将未连接的记录放入第二个流。第二个流通过管道传输到 transform()
,它使用状态存储来缓冲这些记录,您可以使用 punctuations
发送 context.forward()
它们,延迟 5 秒。
在执行 KStream - KTable 左联接时,如果右 table 不匹配,我想将记录发送回同一主题,X 秒后。
这可以通过 DSL 实现吗?
我猜你可以
KStream[] streams = stream.leftJoin(table,...).branch(...);
stream[1].transform(...).to("input-topic");
您使用分支将连接的记录放入第一个流,将未连接的记录放入第二个流。第二个流通过管道传输到 transform()
,它使用状态存储来缓冲这些记录,您可以使用 punctuations
发送 context.forward()
它们,延迟 5 秒。