使用 AggregatingReplyingKafkaTemplate 处理多个响应
Handle multiple responses with AggregatingReplyingKafkaTemplate
我要解决的问题场景与
我知道 AggregatingReplyingKafkaTemplate(使用 spring-kafka-2.3.7)可以帮助解决这个问题,但是,我正在努力寻找正确的 bean 配置并获得一个我可以聚合结果的场景多个消费者。以下是我的 bean 配置,但是,我收到编译错误,提示无法解析 GenericMessageListenerContainer
的构造函数
@Bean
public AggregatingReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf,
KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer,
BiPredicate<List<ConsumerRecord<String, Model>>, Boolean> releaseStrategy) {
return new AggregatingReplyingKafkaTemplate<>(pf, replyContainer, releaseStrategy);
}
@Bean
public GenericMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer(ConsumerFactory<String, Model> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>(cf,
containerProperties);
}
关于如何使用 AggregatingReplyingKafkaTemplate 的任何简单示例都会有所帮助。
非常感谢,
getting compile error to say cannot resolve constructor for GenericMessageListenerContainer
在哪里?我没看到你在尝试构建一个。
您的 replyContainer
bean 应该 return KafkaMessageListenerContainer
鉴于它是 replyKafkaTemplate
工厂方法中声明的类型。
示例见我对 this question 的回答。
使用 ConcurrentKafkaListenerContainerFactory
创建容器最简单。如果你使用Spring启动,它会自动为你配置一个;如果不是,则需要将一个配置为 @Bean
我要解决的问题场景与
我知道 AggregatingReplyingKafkaTemplate(使用 spring-kafka-2.3.7)可以帮助解决这个问题,但是,我正在努力寻找正确的 bean 配置并获得一个我可以聚合结果的场景多个消费者。以下是我的 bean 配置,但是,我收到编译错误,提示无法解析 GenericMessageListenerContainer
的构造函数 @Bean
public AggregatingReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf,
KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer,
BiPredicate<List<ConsumerRecord<String, Model>>, Boolean> releaseStrategy) {
return new AggregatingReplyingKafkaTemplate<>(pf, replyContainer, releaseStrategy);
}
@Bean
public GenericMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer(ConsumerFactory<String, Model> cf) {
ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
return new KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>(cf,
containerProperties);
}
关于如何使用 AggregatingReplyingKafkaTemplate 的任何简单示例都会有所帮助。
非常感谢,
getting compile error to say cannot resolve constructor for
GenericMessageListenerContainer
在哪里?我没看到你在尝试构建一个。
您的 replyContainer
bean 应该 return KafkaMessageListenerContainer
鉴于它是 replyKafkaTemplate
工厂方法中声明的类型。
示例见我对 this question 的回答。
使用 ConcurrentKafkaListenerContainerFactory
创建容器最简单。如果你使用Spring启动,它会自动为你配置一个;如果不是,则需要将一个配置为 @Bean