Kafka - 如何同时使用 filter 和 filternot?

Kafka - How to use filter and filternot at the same time?

我有一个从主题获取数据的 Kafka 流,需要将该信息过滤到两个不同的主题。

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大这是否会对性能产生任何影响。有没有办法只过滤一次然后推送到两个主题?

您的方法是正确的,数据未从主题中读取两次,并且没有内部数据复制正在进行。您的方法的唯一缺点是,对每条记录都评估了两个过滤器谓词——但是,这是非常便宜的,不应该是性能问题。

但是,您仍然可以通过使用 KStream#branch() 来提高性能,它确实采用多个谓词并依次评估所有谓词,并且 return 每个谓词都有一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不再为该单个记录评估进一步的谓词——这确保每个记录被添加到最大一个输出流;或者如果没有谓词匹配)。

因此,您可以只向 branch() 提供两个谓词:第一个与您原来的 filter() 谓词相同,第二个谓词始终 returns true.

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

但不确定此代码是否比您的原始版本更易读。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。

我添加的版本应该稍微更CPU有效,因为对于所有满足谓词的记录,它只被评估一次。对于所有不满足结果的记录,一个简单的 true 将是 return(即没有第二个谓词评估)。

如果您知道大多数记录将在 splitStream[1] 中结束,您还可以反转谓词(并将 splitStream[0] 用作 "bad-stream")以减少对第二个 true-returning 谓词。但这些只是微优化,应该无关紧要。