Spring 启动 Kafka 消费者
Spring boot Kafka consumer
使用spring集成Kafka dsl,请问为什么listener收不到消息?但是同一个应用程序,如果我用 KafkaListener 注释的方法替换 spring 集成 DSL,则可以很好地使用消息。
DSL 缺少什么?
不消耗的DSL代码:
@Configuration
@EnableKafka
class KafkaConfig {
//consumer factory provided by Spring boot
@Bean
IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
.configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
.id("kafkaTopicListener").autoStartup(true)
)
.channel("logChannel")
.get()
}
}
logChannel(或我使用的任何其他渠道)不反映入站消息。
而不是上面的代码,如果我使用普通的监听器,它可以很好地使用消息。
@Component
class KafkaConsumer {
@KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
void inboundKafkaEvent(String message) {
log.debug("message is {}", message)
}
}
两种方法对 Kafka 消费者使用相同的 application.properties。
您忽略了使用 Spring 集成这一事实,但您尚未在您的应用程序中启用它。不过,您不需要为 Kafka 执行此操作,因为您不会使用 @KafkaListener
来使用它。因此,要启用 Spring 集成基础架构,您需要在 @Configuration
class 上添加 @EnableIntegration
:https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration
使用spring集成Kafka dsl,请问为什么listener收不到消息?但是同一个应用程序,如果我用 KafkaListener 注释的方法替换 spring 集成 DSL,则可以很好地使用消息。 DSL 缺少什么?
不消耗的DSL代码:
@Configuration
@EnableKafka
class KafkaConfig {
//consumer factory provided by Spring boot
@Bean
IntegrationFlow inboundKafkaEventFlow(ConsumerFactory consumerFactory) {
IntegrationFlows
.from(Kafka
.messageDrivenChannelAdapter(consumerFactory, "kafkaTopic")
.configureListenerContainer({ c -> c.groupId('kafka-consumer-staging') })
.id("kafkaTopicListener").autoStartup(true)
)
.channel("logChannel")
.get()
}
}
logChannel(或我使用的任何其他渠道)不反映入站消息。
而不是上面的代码,如果我使用普通的监听器,它可以很好地使用消息。
@Component
class KafkaConsumer {
@KafkaListener(topics = ['kafkaTopic'], groupId = 'kafka-consumer-staging')
void inboundKafkaEvent(String message) {
log.debug("message is {}", message)
}
}
两种方法对 Kafka 消费者使用相同的 application.properties。
您忽略了使用 Spring 集成这一事实,但您尚未在您的应用程序中启用它。不过,您不需要为 Kafka 执行此操作,因为您不会使用 @KafkaListener
来使用它。因此,要启用 Spring 集成基础架构,您需要在 @Configuration
class 上添加 @EnableIntegration
:https://docs.spring.io/spring-integration/docs/5.1.6.RELEASE/reference/html/#configuration-enable-integration