Kafka Streams API:使用多个过滤器和一个默认主题

Kafka Streams API: Using multiple filters and a default topic

我正在使用 KStream filter 功能根据匹配的谓词写入特定主题:

val builder: KStreamBuilder = new KStreamBuilder()
val stream: KStream[String, String] = builder.stream(config)

val filter1 = stream.filter(predicate1)
val filter2 = stream.filter(predicate2)

filter1.to("out-topic-1")
filter2.to("out-topic-2")

new KafkaStream(builder, properties).start

我如何确保如果消息与这些过滤器中的任何一个都不匹配,它仍会写入默认主题?我试过为 "default" 流定义输出但没有任何运气:

stream.to("default-topic")

感谢您对此事的任何帮助。

您可以使用 KStream#branch——它需要多个谓词,最后一个谓词可以无条件地 return true。如果您使用 branch 一条记录将只包含在一个输出流中。上面的逻辑与 atm 略有不同,因为一条记录最终可能出现在两个输出流(filter1 和 filter2)中。如果您需要保留此 branch 将不起作用,但您可以添加以下内容以使第二种情况起作用:

val default = stream.filterNot(predicate1).filterNot(predicate2)
default.to("default-topic");