有没有办法配置@KafkaListener 的轮询间隔?
Is there a way to configure polling interval of @KafkaListener?
我是 SpringBoot 和 Kafka 领域的新手。我看到定义消费者的常用方法是 @KafkaListener
。
在 psat 中,我配置了一个调度程序,每隔 X 次轮询一次 Kafka 代理,但在这里我没有找到如何指定此间隔或有关将数据从代理推送到消费者的文档。
有没有办法配置它(或者如果消费数据的方式是通过推送我会很乐意理解它)
它是 recently introduced 到 spring kafka,所以如果你使用的是现代版本,它应该是这样的:
build.gradle
implementation("org.springframework.kafka:spring-kafka:2.6.1") // or higher, minimum required is 2.3.0
Config.java
@Bean("singleKafkaListenerContainerFactoryManualCommit")
public ConcurrentKafkaListenerContainerFactory<?, ?> singleKafkaListenerContainerFactoryManualCommit(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(false);
factory.getContainerProperties().setIdleBetweenPolls(1000L); // <- this is basically it
return factory;
}
它的样子:
2020-11-27 17:04:41.099 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:42.143 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:43.185 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:44.223 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
请记住,您需要 kafka 代理版本 >= 1.0 才能使其正常工作。
我是 SpringBoot 和 Kafka 领域的新手。我看到定义消费者的常用方法是 @KafkaListener
。
在 psat 中,我配置了一个调度程序,每隔 X 次轮询一次 Kafka 代理,但在这里我没有找到如何指定此间隔或有关将数据从代理推送到消费者的文档。
有没有办法配置它(或者如果消费数据的方式是通过推送我会很乐意理解它)
它是 recently introduced 到 spring kafka,所以如果你使用的是现代版本,它应该是这样的:
build.gradle
implementation("org.springframework.kafka:spring-kafka:2.6.1") // or higher, minimum required is 2.3.0
Config.java
@Bean("singleKafkaListenerContainerFactoryManualCommit")
public ConcurrentKafkaListenerContainerFactory<?, ?> singleKafkaListenerContainerFactoryManualCommit(ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(false);
factory.getContainerProperties().setIdleBetweenPolls(1000L); // <- this is basically it
return factory;
}
它的样子:
2020-11-27 17:04:41.099 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:42.143 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:43.185 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
2020-11-27 17:04:44.223 INFO 67146 --- [ntainer#0-0-C-1] c.r.t.m.d.l.service.KafkaService : Consumed message
请记住,您需要 kafka 代理版本 >= 1.0 才能使其正常工作。