Spring 当应用程序关闭时,Kafka 提交重置偏移量不起作用
Spring Kafka commit reset offset not work when application goes down
如文档中所述,仅当我实际提交时(AckMode.MANUAL_IMMEDIATE 或 AckMode.MANUAL 时)或 AckMode.RECORD 时侦听器执行结束时才应提交偏移量,但是,在处理用@KafkaListener 注释的方法的过程中,应用程序关闭,消息不会重新传递,应用程序开始读取下一条有效消息并且当前消息丢失(应用程序正在处理的消息已重新启动),当应用程序在处理过程中重新启动时,如何实现应用程序重新处理未提交消息的目标?我还尝试将 AUTO_OFFSET_RESET_CONFIG 配置为最早、最新和 none,但在 3 个模型中均未成功。出于测试目的,我创建了一个只有一个分区的主题,我强制监听器使用我手动定义的容器工厂。
springboot-version 2.2.6
@Configuration
class KafkaTestConfiguration {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfigs())
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun producerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun consumerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.consumerFactory.createConsumer()
val containerProperties = factory.containerProperties
containerProperties.isAckOnError = false
containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
containerProperties.isLogContainerConfig = true
return factory
}
@Component
class KafkaListenerAck {
@KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
groupId = "kafka-retry",
concurrency = "1",
containerFactory = "kafkaListenerContainerFactory"
)
fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
println("If stop container here, the next pool will not deliver the current unconfirmed message")
acknowledgment?.acknowledge()
}
}
在调用 acknowledgment.acknowledge()
之前不会提交偏移量。将 commitLogLevel
容器 属性 设置为 DEBUG
以查看提交 activity.
auto.offset.reset
仅适用于消费者从未提交偏移量的情况(仅限新消费者组)。
如果你从日志中看不出来;使用日志片段编辑问题。
如文档中所述,仅当我实际提交时(AckMode.MANUAL_IMMEDIATE 或 AckMode.MANUAL 时)或 AckMode.RECORD 时侦听器执行结束时才应提交偏移量,但是,在处理用@KafkaListener 注释的方法的过程中,应用程序关闭,消息不会重新传递,应用程序开始读取下一条有效消息并且当前消息丢失(应用程序正在处理的消息已重新启动),当应用程序在处理过程中重新启动时,如何实现应用程序重新处理未提交消息的目标?我还尝试将 AUTO_OFFSET_RESET_CONFIG 配置为最早、最新和 none,但在 3 个模型中均未成功。出于测试目的,我创建了一个只有一个分区的主题,我强制监听器使用我手动定义的容器工厂。 springboot-version 2.2.6
@Configuration
class KafkaTestConfiguration {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfigs())
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun producerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun consumerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.consumerFactory.createConsumer()
val containerProperties = factory.containerProperties
containerProperties.isAckOnError = false
containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
containerProperties.isLogContainerConfig = true
return factory
}
@Component
class KafkaListenerAck {
@KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
groupId = "kafka-retry",
concurrency = "1",
containerFactory = "kafkaListenerContainerFactory"
)
fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
println("If stop container here, the next pool will not deliver the current unconfirmed message")
acknowledgment?.acknowledge()
}
}
在调用 acknowledgment.acknowledge()
之前不会提交偏移量。将 commitLogLevel
容器 属性 设置为 DEBUG
以查看提交 activity.
auto.offset.reset
仅适用于消费者从未提交偏移量的情况(仅限新消费者组)。
如果你从日志中看不出来;使用日志片段编辑问题。