Spring 当应用程序关闭时,Kafka 提交重置偏移量不起作用

Spring Kafka commit reset offset not work when application goes down

如文档中所述,仅当我实际提交时(AckMode.MANUAL_IMMEDIATE 或 AckMode.MANUAL 时)或 AckMode.RECORD 时侦听器执行结束时才应提交偏移量,但是,在处理用@KafkaListener 注释的方法的过程中,应用程序关闭,消息不会重新传递,应用程序开始读取下一条有效消息并且当前消息丢失(应用程序正在处理的消息已重新启动),当应用程序在处理过程中重新启动时,如何实现应用程序重新处理未提交消息的目标?我还尝试将 AUTO_OFFSET_RESET_CONFIG 配置为最早、最新和 none,但在 3 个模型中均未成功。出于测试目的,我创建了一个只有一个分区的主题,我强制监听器使用我手动定义的容器工厂。 springboot-version 2.2.6

    @Configuration
    class KafkaTestConfiguration {

        @Bean
        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfigs())
        }

        @Bean
        fun consumerFactory(): ConsumerFactory<Any, Any> {
            return DefaultKafkaConsumerFactory(consumerConfigs())
        }

        @Bean
        fun producerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            return props
        }

        @Bean
        fun consumerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
            props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
            props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
            return props
        }

        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory())
        }

  @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = consumerFactory()
        factory.consumerFactory.createConsumer()
        val containerProperties = factory.containerProperties
        containerProperties.isAckOnError = false
        containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
        containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
        containerProperties.isLogContainerConfig = true
        return factory
    }

    @Component
    class KafkaListenerAck {
        @KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
                groupId = "kafka-retry",
                concurrency = "1",
                containerFactory = "kafkaListenerContainerFactory"
        )
        fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
            println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
            println("If stop container here, the next pool will not deliver the current unconfirmed message")
            acknowledgment?.acknowledge()
        }
    }

在调用 acknowledgment.acknowledge() 之前不会提交偏移量。将 commitLogLevel 容器 属性 设置为 DEBUG 以查看提交 activity.

auto.offset.reset 仅适用于消费者从未提交偏移量的情况(仅限新消费者组)。

如果你从日志中看不出来;使用日志片段编辑问题。