合并多个相同的 Kafka Streams 主题

Merging multiple identical Kafka Streams topics

我有 2 个 Kafka 主题流式传输来自不同来源的完全相同的内容,因此我可以在其中一个来源出现故障时保持高可用性。 我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时不会出现重复。

当使用KStream的leftJoin方法时,其中一个topic可以down掉没有问题(secondary topic),但是当primary topic down掉时,没有任何东西发送到output topic。这似乎是因为,根据 Kafka Streams developer guide

KStream-KStream leftJoin is always driven by records arriving from the primary stream

所以如果没有来自主流的记录,它不会使用来自次流的记录,即使它们存在。一旦主流重新联机,输出将恢复正常。

我也尝试过使用 outerJoin(添加重复记录),然后转换为 KTable 和 groupByKey 以消除重复项,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

但我偶尔还是会收到重复的。我还使用 commit.interval.ms=200 让 KTable 足够频繁地发送到输出流。

处理此合并以从多个相同的输入主题获得恰好一次输出的最佳方法是什么?

使用任何类型的联接都不能解决您的问题,因为您总是会以丢失结果(内部联接以防某些流停止)或 "duplicates" 与 null(左-join 或 outer-join,以防两个流都在线)。有关 Kafka Streams 中连接语义的详细信息,请参阅 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

因此,我建议使用处理器 API,您可以使用 KStream process()transform() 或 [=14] 与 DSL 混合搭配=].有关详细信息,请参阅

您还可以将自定义存储添加到您的处理器 () 以使重复过滤容错。