@RetryableTopic 在使用 topicPartitions 重置偏移量时表现出奇怪的行为 - spring kafka

@RetryableTopic showing weird behaviour when using with topicPartitions to reset offset - spring kafka

我正在尝试使用 @RetryableTopic 来取消阻塞重试和 topicPartitions 以便从头开始读取消息。

下面是我的监听器(我只有一个分区):

@Slf4j
@Component
public class SingleTopicRetryConsumer {

    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 1000),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

    @DltHandler
    public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }
}

配置属性:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

上面的代码开始发出奇怪的行为,它两次从主侦听器读取相同的消息,一次从 DLT 读取相同的消息,但仅从主要主题读取。

日志:

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer 
- message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810          

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810      

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed at DLT - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810  

如果我在没有 topicPartitions 的情况下通过删除下面的行来使用上面的代码,侦听器将按预期工作。

            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}

关于为什么会发生这种情况的任何线索?

更新:此错误已在 Spring 中修复,适用于 Apache Kafka 2.8.5

这是一个错误。问题是我们将重试主题名称设置为端点的 topics 属性,而不是将其设置为 topicPartition。因此,我们最终为主要端点提供了两个侦听器,为重试主题提供了 none。

如果可以,请打开一个问题:https://github.com/spring-projects/spring-kafka/issues

不确定是否有使用主题分区解决此问题的解决方法 - 它应该在几周内在 2.8.5 版本中修复。

感谢报告。