使用 AggregatingReplyingKafkaTemplate 处理多个响应

Handle multiple responses with AggregatingReplyingKafkaTemplate

我要解决的问题场景与

我知道 AggregatingReplyingKafkaTemplate(使用 spring-kafka-2.3.7)可以帮助解决这个问题,但是,我正在努力寻找正确的 bean 配置并获得一个我可以聚合结果的场景多个消费者。以下是我的 bean 配置,但是,我收到编译错误,提示无法解析 GenericMessageListenerContainer

的构造函数
    @Bean
    public AggregatingReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf,
                                                                                     KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>> replyContainer,
                                                                                     BiPredicate<List<ConsumerRecord<String, Model>>, Boolean> releaseStrategy) {
        return new AggregatingReplyingKafkaTemplate<>(pf, replyContainer, releaseStrategy);
    }
@Bean
    public GenericMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>  replyContainer(ConsumerFactory<String, Model> cf) {
        ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);
        return new KafkaMessageListenerContainer<String, Collection<ConsumerRecord<String, Model>>>(cf,
                containerProperties);
    }

关于如何使用 AggregatingReplyingKafkaTemplate 的任何简单示例都会有所帮助。

非常感谢,

getting compile error to say cannot resolve constructor for GenericMessageListenerContainer

在哪里?我没看到你在尝试构建一个。

您的 replyContainer bean 应该 return KafkaMessageListenerContainer 鉴于它是 replyKafkaTemplate 工厂方法中声明的类型。

示例见我对 this question 的回答。

使用 ConcurrentKafkaListenerContainerFactory 创建容器最简单。如果你使用Spring启动,它会自动为你配置一个;如果不是,则需要将一个配置为 @Bean