Kafka 消费者错误处理:混淆 ErrorHandling / RetryTemplate

Kafka Consumer Error Handling : confusion ErrorHandling / RetryTemplate

我在使用消息时尝试进行以下错误处理:

我所拥有的(Spring kafka 2.5.5.RELEASE with 2.5.1 Kafka Client)如下:

@Bean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, MyObject> kafkaListenerContainerFactory() throws ExceptionTechnique {
        ConcurrentKafkaListenerContainerFactory<String, MyObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(RECORD);
        factory.setRetryTemplate(retryTemplate());
        factory.setRecoveryCallback(context -> {
           Object record = context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
           LOGGER_TECHNIQUE.error("Fail to handle message after {} retries. {}",
                context.getRetryCount(), record);
           return record;
        });
        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaOperations,
                        (cr, e) -> new TopicPartition("myTopicToRead.dlq", cr.partition())),
                        new FixedBackOff(10000L, 2L)));
        factory.setConcurrency(kafkaListenerConcurrency);
        factory.setStatefulRetry(true);

        return factory;
    }

@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy());
    retryTemplate.setRetryPolicy(retryPolicy());
    return retryTemplate;
}

private BackOffPolicy backOffPolicy() {
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    fixedBackOffPolicy.setBackOffPeriod(10000);
    return fixedBackOffPolicy;
}

private SimpleRetryPolicy retryPolicy() {
    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();

    exceptionMap.put(IllegalArgumentException.class, false);
    exceptionMap.put(TimeoutException.class, false);
    exceptionMap.put(ListenerExecutionFailedException.class, true);
    // Custom exception
    exceptionMap.put(MyTechnicalException.class, false);
    exceptionMap.put(MyFunctionalException.class, false);

    return new SimpleRetryPolicy(3, exceptionMap, true);
}

现在,如果我发送不可序列化的消息,我的消息会在不重试的情况下进入 DLT -> OK!

在我的 @KafkaHandler 中,我有一个 throw new MyTechnicalException,被捕获并重新抛出。

我应该没有重试,但我重试了 2 次,每次 20 秒(而不是 10 秒?),并且在 2 次重试后向 DLT 发送了一条消息。

如果我删除 errorHandler,毫不奇怪,尝试 3 次并显示我的日志错误消息...但我需要发送到 DLQ...

如果我删除 RetryTemplate 和 RecoveryCallback,不足为奇,但所有异常都会重试...

问题:

  1. 有没有办法处理我的用例?怎么样?
  2. 出厂配置中的RecoveryCallback和ErrorHandler有什么区别?
  3. 重试取决于异常类型如何工作?它适用于我们的异常吗?

侦听器级别的重试已被弃用,因为错误处理程序已经发展到涵盖 RetryTemplate 提供的所有功能 - 回退、异常分类等。

https://github.com/spring-projects/spring-kafka/issues/1886