KStream 过滤器会消耗每条消息吗?
Does KStream filter consume every message?
我过去使用过 Kafka,但从未使用过流 API。我的任务是构建一个可扩展的服务,该服务接受 websocket 连接并根据用户 ID 将出站消息从中心主题路由到正确的会话。
使用 KStream 这看起来简单得可笑。来自 one online tutorial:
builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
.filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
.to(outputTopic, Produced.with(Serdes.String(), publicationSerde));
但是过滤器命令是否会使用主题中的每条消息并在应用程序中执行过滤器 space?或者 KStream filter(Predicate super K,? super V> predicate) 是否包含进入 Kafka 内部工作的钩子,只允许它接收匹配正确的消息钥匙?
KStream javadoc 上的措辞似乎暗示了前者:“逐条消息消费。”
如果过滤器的唯一目的是使用主题的每条消息并丢弃不相关的消息,我可以手动完成。
你是对的 - 消息需要反序列化,然后根据谓词进行检查(在应用程序 space 中)
throw away those that are not relevant, I could do that by hand
当然可以,但是 Kafka Streams 有对 defining session windows 有用的方法。另外,您无需定义消费者和生产者实例即可转发到新主题。
我过去使用过 Kafka,但从未使用过流 API。我的任务是构建一个可扩展的服务,该服务接受 websocket 连接并根据用户 ID 将出站消息从中心主题路由到正确的会话。
使用 KStream
builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
.filter((name, publication) -> "George R. R. Martin".equals(publication.getName()))
.to(outputTopic, Produced.with(Serdes.String(), publicationSerde));
但是过滤器命令是否会使用主题中的每条消息并在应用程序中执行过滤器 space?或者 KStream
KStream
如果过滤器的唯一目的是使用主题的每条消息并丢弃不相关的消息,我可以手动完成。
你是对的 - 消息需要反序列化,然后根据谓词进行检查(在应用程序 space 中)
throw away those that are not relevant, I could do that by hand
当然可以,但是 Kafka Streams 有对 defining session windows 有用的方法。另外,您无需定义消费者和生产者实例即可转发到新主题。