Kafka - 如何同时使用 filter 和 filternot?
Kafka - How to use filter and filternot at the same time?
我有一个从主题获取数据的 Kafka 流,需要将该信息过滤到两个不同的主题。
KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");
但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大这是否会对性能产生任何影响。有没有办法只过滤一次然后推送到两个主题?
您的方法是正确的,数据未从主题中读取两次,并且没有内部数据复制正在进行。您的方法的唯一缺点是,对每条记录都评估了两个过滤器谓词——但是,这是非常便宜的,不应该是性能问题。
但是,您仍然可以通过使用 KStream#branch()
来提高性能,它确实采用多个谓词并依次评估所有谓词,并且 return 每个谓词都有一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不再为该单个记录评估进一步的谓词——这确保每个记录被添加到最大一个输出流;或者如果没有谓词匹配)。
因此,您可以只向 branch()
提供两个谓词:第一个与您原来的 filter()
谓词相同,第二个谓词始终 returns true
.
KStream<String, Model> stream = builder.stream(
Serdes.String(),
specificAvroSerde,
"not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
(key, value) -> new Processor().test(key,value),
(key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");
但不确定此代码是否比您的原始版本更易读。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。
我添加的版本应该稍微更CPU有效,因为对于所有满足谓词的记录,它只被评估一次。对于所有不满足结果的记录,一个简单的 true
将是 return(即没有第二个谓词评估)。
如果您知道大多数记录将在 splitStream[1]
中结束,您还可以反转谓词(并将 splitStream[0]
用作 "bad-stream")以减少对第二个 true
-returning 谓词。但这些只是微优化,应该无关紧要。
我有一个从主题获取数据的 Kafka 流,需要将该信息过滤到两个不同的主题。
KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");
但是,当我这样做时,它会从主题中读取数据两次——不确定随着数据变大这是否会对性能产生任何影响。有没有办法只过滤一次然后推送到两个主题?
您的方法是正确的,数据未从主题中读取两次,并且没有内部数据复制正在进行。您的方法的唯一缺点是,对每条记录都评估了两个过滤器谓词——但是,这是非常便宜的,不应该是性能问题。
但是,您仍然可以通过使用 KStream#branch()
来提高性能,它确实采用多个谓词并依次评估所有谓词,并且 return 每个谓词都有一个输入流。如果记录与谓词匹配,则将其放入相应的输出流中并停止评估(即,不再为该单个记录评估进一步的谓词——这确保每个记录被添加到最大一个输出流;或者如果没有谓词匹配)。
因此,您可以只向 branch()
提供两个谓词:第一个与您原来的 filter()
谓词相同,第二个谓词始终 returns true
.
KStream<String, Model> stream = builder.stream(
Serdes.String(),
specificAvroSerde,
"not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
(key, value) -> new Processor().test(key,value),
(key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");
但不确定此代码是否比您的原始版本更易读。我想这是一个品味问题,我个人更喜欢你的原始代码,因为它确实更好地表达了语义。
我添加的版本应该稍微更CPU有效,因为对于所有满足谓词的记录,它只被评估一次。对于所有不满足结果的记录,一个简单的 true
将是 return(即没有第二个谓词评估)。
如果您知道大多数记录将在 splitStream[1]
中结束,您还可以反转谓词(并将 splitStream[0]
用作 "bad-stream")以减少对第二个 true
-returning 谓词。但这些只是微优化,应该无关紧要。