spring kafka consumer @retryabletopic 无限重试

Infinite retry in spring kafka consumer @retryabletopic

我正在使用@RetryableTopic 在kafka 消费者中实现重试逻辑。 我给出的配置如下:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 300000, multiplier = 10.0),
            autoCreateTopics = "false",
            topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
    )

但是,它不是重试 4 次,而是无限次重试,而且没有延迟时间。有人可以帮我处理代码吗? 我希望消息重试 4 次,第一次延迟 - 5 分钟后,然后 10 分钟后第二次延迟,20 分钟后第三次延迟...

代码如下:

int i = 1;

@RetryableTopic(
        attempts = "4",
        backoff = @Backoff(delay = 300000, multiplier = 10.0),
        autoCreateTopics = "false",
        topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(topics = "topic_string_data", containerFactory = "default")
public void consume(@Payload String message , @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    String prachi = null;
    System.out.println("current time: " + new Date());
    System.out.println("retry method invoked -> " + i++ + " times from topic: " + topic);
    System.out.println("current time: " + new Date());
    prachi.equals("abc");
}

@DltHandler
public void listenDlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                      @Header(KafkaHeaders.OFFSET) long offset) {
    System.out.println("current time dlt: " + new Date());
    System.out.println("DLT Received: " + in + " from " + topic + " offset " + offset + " -> " + i++ + " times");
    System.out.println("current time dlt: " + new Date());
    //dump event to dlt queue
}

卡夫卡配置:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "grp_STRING");
    return new DefaultKafkaConsumerFactory<>(config);

    //inject consumer factory to kafka listener consumer factory
}

@Bean(name = "default")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

运行 应用时的日志:这不是完整的日志:

32:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.186  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-1-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [3-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-0-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-1 to the committed offset FetchPosition{offset=27, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.187  INFO 96675 --- [4-retry-1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-1-0]
    2022-02-23 13:58:40.187  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Setting offset for partition topic_string_data-retry-2-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-2, groupId=grp_STRING] Setting offset for partition topic_string_data-0 to the committed offset FetchPosition{offset=24, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-1 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-4, groupId=grp_STRING] Setting offset for partition topic_string-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.188  INFO 96675 --- [5-retry-2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-retry-2-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ntainer#2-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-1, topic_string_data-0]
    2022-02-23 13:58:40.189  INFO 96675 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string-1, topic_string-0]
    2022-02-23 13:58:40.188  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-2 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.190  INFO 96675 --- [ner#6-dlt-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-grp_STRING-7, groupId=grp_STRING] Setting offset for partition topic_string_data-dlt-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[197mumnb29632:9092 (id: 0 rack: null)], epoch=0}}
    2022-02-23 13:58:40.191  INFO 96675 --- [ner#6-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : grp_STRING: partitions assigned: [topic_string_data-dlt-0, topic_string_data-dlt-1, topic_string_data-dlt-2]
    2022-02-23 13:58:40.196  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-1-0
    2022-02-23 13:58:40.196  WARN 96675 --- [4-retry-1-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-1 is not ready for consumption, backing off for approx. 26346 millis.
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 3 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:40.713  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:40 IST 2022
    retry method invoked -> 4 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:40 IST 2022
    2022-02-23 13:58:41.228  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 5 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:41.740  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:41 IST 2022
    retry method invoked -> 6 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:41 IST 2022
    2022-02-23 13:58:42.254  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 7 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:42.777  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:42 IST 2022
    retry method invoked -> 8 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:42 IST 2022
    2022-02-23 13:58:43.298  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 9 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    2022-02-23 13:58:43.809  INFO 96675 --- [3-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-3, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-0-0
    current time: Wed Feb 23 13:58:43 IST 2022
    retry method invoked -> 10 times from topic: topic_string_data-retry-0
    current time: Wed Feb 23 13:58:43 IST 2022
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 11 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:10.733  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-1-0
    2022-02-23 13:59:10.736  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 1 for partition topic_string_data-retry-2-0
    2022-02-23 13:59:10.737  WARN 96675 --- [5-retry-2-0-C-1] essageListenerContainer$ListenerConsumer : Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.kafka.listener.KafkaBackoffException: Partition 0 from topic topic_string_data-retry-2 is not ready for consumption, backing off for approx. 29483 millis.
    current time: Wed Feb 23 13:59:10 IST 2022
    retry method invoked -> 12 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:10 IST 2022
    2022-02-23 13:59:11.249  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 13 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:11.769  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:11 IST 2022
    retry method invoked -> 14 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:11 IST 2022
    2022-02-23 13:59:12.286  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 15 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:12.805  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:12 IST 2022
    retry method invoked -> 16 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:12 IST 2022
    2022-02-23 13:59:13.339  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 17 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:13.856  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:13 IST 2022
    retry method invoked -> 18 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:13 IST 2022
    2022-02-23 13:59:14.372  INFO 96675 --- [4-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-6, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-1-0
    current time: Wed Feb 23 13:59:14 IST 2022
    retry method invoked -> 19 times from topic: topic_string_data-retry-1
    current time: Wed Feb 23 13:59:14 IST 2022
    current time: Wed Feb 23 13:59:40 IST 2022
    retry method invoked -> 20 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:40 IST 2022
    2022-02-23 13:59:40.846  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 2 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:40 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 14 -> 21 times
    current time dlt: Wed Feb 23 13:59:46 IST 2022
    current time: Wed Feb 23 13:59:46 IST 2022
    retry method invoked -> 22 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:46 IST 2022
    2022-02-23 13:59:47.466  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 3 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachi from topic_string_data-dlt offset 15 -> 23 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 24 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:47.981  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 4 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    DLT Received: prachisharma from topic_string_data-dlt offset 16 -> 25 times
    current time dlt: Wed Feb 23 13:59:47 IST 2022
    current time: Wed Feb 23 13:59:47 IST 2022
    retry method invoked -> 26 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:47 IST 2022
    2022-02-23 13:59:48.493  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 5 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 17 -> 27 times
    current time dlt: Wed Feb 23 13:59:48 IST 2022
    current time: Wed Feb 23 13:59:48 IST 2022
    retry method invoked -> 28 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:48 IST 2022
    2022-02-23 13:59:49.011  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 6 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hieeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee from topic_string_data-dlt offset 18 -> 29 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 30 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    2022-02-23 13:59:49.527  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 7 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    DLT Received: hie from topic_string_data-dlt offset 19 -> 31 times
    current time dlt: Wed Feb 23 13:59:49 IST 2022
    current time: Wed Feb 23 13:59:49 IST 2022
    retry method invoked -> 32 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:49 IST 2022
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 20 -> 33 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.039  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 8 for partition topic_string_data-retry-2-0
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 34 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    2022-02-23 13:59:50.545  INFO 96675 --- [5-retry-2-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-grp_STRING-1, groupId=grp_STRING] Seeking to offset 9 for partition topic_string_data-retry-2-0
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 21 -> 35 times
    current time dlt: Wed Feb 23 13:59:50 IST 2022
    current time: Wed Feb 23 13:59:50 IST 2022
    retry method invoked -> 36 times from topic: topic_string_data-retry-2
    current time: Wed Feb 23 13:59:50 IST 2022
    current time dlt: Wed Feb 23 13:59:51 IST 2022
    DLT Received: hi from topic_string_data-dlt offset 22 -> 37 times
    current time dlt: Wed Feb 23 13:59:51 IST 2022

似乎有两个不同的问题。

一个是您似乎已经在主题中有记录,如果您将其配置为 earliest 应用程序将在启动时读取所有这些记录。您可以将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 latest,或者,如果您在 docker 上本地 运行,您可以停止 Kafka 容器并使用类似 docker system prune --volumes(请注意,这将清除所有已停止容器中的数据 - 明智地使用)。

你能尝试其中一个并再次测试吗?

另一个问题是框架错误地将默认的 maxDelay 设置为 30 秒,即使注释声明默认为忽略。我会为此打开一个问题并在此处添加 link。

目前您可以设置一个 maxDelay,例如 @Backoff(delay = 600000, multiplier = 3.0, maxDelay = 5400000),然后应用程序应该有您想要的 10、30 和 90 分钟的正确延迟。

让我知道这是否适合您,或者如果您有与此问题相关的任何其他问题。

编辑:问题已打开,您可以在那里关注开发 https://github.com/spring-projects/spring-kafka/issues/2137

应该会在下个版本中修复。

编辑 2:实际上 @BackOff 注释中的措辞相当含糊,但似乎行为是正确的,您应该明确设置更大的 maxDelay。

文档应在下一版本中阐明此行为。

编辑 3:要在评论中回答您的问题,可重试主题的工作方式是分区在延迟期间暂停,但消费者不断轮询代理,因此较长的延迟不会触发重新平衡。

根据您的日志,重新平衡来自主要主题的分区,因此它不太可能与此功能有任何关系。

编辑 4:可重试主题功能已在 Spring 中针对 Apache Kafka 2.7.0 发布,它使用 kafka-clients 2.7.0。但是,该功能已有多项改进,因此我建议尽可能使用最新的 Spring Kafka 版本(当前为 2.8.3)以从中受益。