Spring 启动 Kafka 消费者

Spring boot Kafka consumer

使用spring集成Kafka dsl,请问为什么listener收不到消息?但是同一个应用程序,如果我用 KafkaListener 注释的方法替换 spring 集成 DSL,则可以很好地使用消息。 DSL 缺少什么?

不消耗的DSL代码:

@Configuration
@EnableKafka
class KafkaConfig {
    //consumer factory provided by Spring boot
    @Bean
    IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
        IntegrationFlows
                .from(Kafka
                .messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
                .configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
                .id("kafkaTopicListener").autoStartup(true)
        )

                .channel("logChannel")
                .get()
    }
}

logChannel(或我使用的任何其他渠道)不反映入站消息。

而不是上面的代码,如果我使用普通的监听器,它可以很好地使用消息。

@Component
class KafkaConsumer {
    @KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
    void inboundKafkaEvent(String message) {
        log.debug("message is {}", message)
    }
}

两种方法对 Kafka 消费者使用相同的 application.properties。

您忽略了使用 Spring 集成这一事实,但您尚未在您的应用程序中启用它。不过,您不需要为 Kafka 执行此操作,因为您不会使用 @KafkaListener 来使用它。因此,要启用 Spring 集成基础架构,您需要在 @Configuration class 上添加 @EnableIntegrationhttps://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration