Kafka Streams API:使用多个过滤器和一个默认主题
Kafka Streams API: Using multiple filters and a default topic
我正在使用 KStream filter
功能根据匹配的谓词写入特定主题:
val builder: KStreamBuilder = new KStreamBuilder()
val stream: KStream[String, String] = builder.stream(config)
val filter1 = stream.filter(predicate1)
val filter2 = stream.filter(predicate2)
filter1.to("out-topic-1")
filter2.to("out-topic-2")
new KafkaStream(builder, properties).start
我如何确保如果消息与这些过滤器中的任何一个都不匹配,它仍会写入默认主题?我试过为 "default" 流定义输出但没有任何运气:
stream.to("default-topic")
感谢您对此事的任何帮助。
您可以使用 KStream#branch
——它需要多个谓词,最后一个谓词可以无条件地 return true
。如果您使用 branch
一条记录将只包含在一个输出流中。上面的逻辑与 atm 略有不同,因为一条记录最终可能出现在两个输出流(filter1 和 filter2)中。如果您需要保留此 branch
将不起作用,但您可以添加以下内容以使第二种情况起作用:
val default = stream.filterNot(predicate1).filterNot(predicate2)
default.to("default-topic");
我正在使用 KStream filter
功能根据匹配的谓词写入特定主题:
val builder: KStreamBuilder = new KStreamBuilder()
val stream: KStream[String, String] = builder.stream(config)
val filter1 = stream.filter(predicate1)
val filter2 = stream.filter(predicate2)
filter1.to("out-topic-1")
filter2.to("out-topic-2")
new KafkaStream(builder, properties).start
我如何确保如果消息与这些过滤器中的任何一个都不匹配,它仍会写入默认主题?我试过为 "default" 流定义输出但没有任何运气:
stream.to("default-topic")
感谢您对此事的任何帮助。
您可以使用 KStream#branch
——它需要多个谓词,最后一个谓词可以无条件地 return true
。如果您使用 branch
一条记录将只包含在一个输出流中。上面的逻辑与 atm 略有不同,因为一条记录最终可能出现在两个输出流(filter1 和 filter2)中。如果您需要保留此 branch
将不起作用,但您可以添加以下内容以使第二种情况起作用:
val default = stream.filterNot(predicate1).filterNot(predicate2)
default.to("default-topic");