spring kafka stream 使用函数式风格消费来自多个主题的消息无效
spring kafka stream consume messages from multiple topics using functional style not working
我正在尝试使用单一绑定来处理来自多个主题的消息,而只是简单地打印。但出于某种原因,我只能从第一个主题中获取消息。我是否以不同方式指定了 属性。我关注了此博客 https://spring.io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-2-programming-model-continued.
中的多重主题讨论
// My properties
spring.cloud.stream.bindings.routeRequests-in-0.destination=kafk.pds.orch.be-uda.complete,kafk.pds.orch.prov-uda.complete,kafk.pds.orch.location-uda.complete
spring.cloud.stream.bindings.routeRequests-in-0.consumer.use-native-decoding=false
spring.cloud.stream.kafka.streams.binder.functions.routeRequests.applicationId=kafk.pds.orch.stream.routeRequests
// My code
@Bean
public Consumer<KStream<String, String>> routeRequests() {
return uda -> uda.foreach((s, request) -> {
System.out.println("Hello:" + s);
});
我没有发现您的代码有任何问题。我刚刚使用示例应用程序验证了此功能是否有效。参见 here。
将此示例应用程序与您的应用程序进行比较。如果仍然无法正常工作,请随时分享可重现的样本,以便我们进一步分类。
顺便说一句 - 您不需要将 use-native-decoding
设置为 false
,除非您有特定的理由这样做。默认值为 true
(这意味着我们依赖 Kafka Streams 的 Serde
机制)。但是,这不是您的问题。我能够 运行 使用这两种设置的示例应用程序。
我正在尝试使用单一绑定来处理来自多个主题的消息,而只是简单地打印。但出于某种原因,我只能从第一个主题中获取消息。我是否以不同方式指定了 属性。我关注了此博客 https://spring.io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-2-programming-model-continued.
中的多重主题讨论 // My properties
spring.cloud.stream.bindings.routeRequests-in-0.destination=kafk.pds.orch.be-uda.complete,kafk.pds.orch.prov-uda.complete,kafk.pds.orch.location-uda.complete
spring.cloud.stream.bindings.routeRequests-in-0.consumer.use-native-decoding=false
spring.cloud.stream.kafka.streams.binder.functions.routeRequests.applicationId=kafk.pds.orch.stream.routeRequests
// My code
@Bean
public Consumer<KStream<String, String>> routeRequests() {
return uda -> uda.foreach((s, request) -> {
System.out.println("Hello:" + s);
});
我没有发现您的代码有任何问题。我刚刚使用示例应用程序验证了此功能是否有效。参见 here。
将此示例应用程序与您的应用程序进行比较。如果仍然无法正常工作,请随时分享可重现的样本,以便我们进一步分类。
顺便说一句 - 您不需要将 use-native-decoding
设置为 false
,除非您有特定的理由这样做。默认值为 true
(这意味着我们依赖 Kafka Streams 的 Serde
机制)。但是,这不是您的问题。我能够 运行 使用这两种设置的示例应用程序。