SeekToCurrentErrorHandler + ExponentialBackOff 将在 backOff 过去后记录错误,这是故意的吗?

SeekToCurrentErrorHandler + ExponentialBackOff will log errors after backOff has passed, is that intentional?

上下文:

“有状态重试”(https://docs.spring.io/spring-kafka/reference/html/#stateful-retry) and "seek to current" (https://docs.spring.io/spring-kafka/reference/html/#seek-to-current) 中的文档听起来像是作为用户,我应该从 RetryTemplate 迁移到使用 SeekToCurrentErrorHandler 中的 BackOff 函数。

我目前混合使用 RetryTemplate 和针对某些异常的无限循环 + SeekToCurrentErrorHandler 固定重试 3 次,适用于所有其他异常。

现在我想用 handler.setBackOffFunction((record, ex) -> { ... }); 替换此尝试,但我一直面临以下问题

但我不确定这是否有意,我配置错误或者这是一个错误。

问题:

当我以较大的间隔使用 SeekToCurrentErrorHandler 时,“嘿,你的侦听器抛出异常”的错误消息似乎在间隔完成后记录。这是故意的吗?我的代码抛出异常,日志条目可能会在很久之后出现。

这里我们在 22:59:14 处执行第 1 行。不久之后抛出一个异常,但 10 秒后在 22:59:24 处出现在日志中。使用 ExponentialBackOff 时,该时间范围变得越来越大。

foo1 at 2020-11-20T22:59:14.850721600
2020-11-20 22:59:24.976  INFO 21456 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 22:59:24.976 ERROR 21456 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T22:59:14.850721600
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:157) ~[spring-kafka-2.6.3.jar:2.6.3]

可重现以下内容:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        LocalDateTime now = LocalDateTime.now();
        System.out.println(in + " at " + now);
        throw new RuntimeException("Thrown at " + now);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> System.out.println("I am the dlq"), new ExponentialBackOff());
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" + i));
        };
    }

}

我发现以下内容可以让我在之后立即进行记录:

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setListeners(new RetryListener[]{new RetryListener() {
    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        return true;
    }
    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    }
    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println(LocalDateTime.now() + "Oops happened at " + context);
    }
}});
retryTemplate.setRetryPolicy(new NeverRetryPolicy());
factory.setRetryTemplate(retryTemplate);

所以我添加了一个带有“neverRetry”策略的重试模板,这样我就可以使用记录器了。 ,我现在立即收到错误发生的反馈。

foo1 at 2020-11-20T23:05:57.769425100
2020-11-20T23:05:57.769425100Oops happened at [RetryContext: count=1, lastException=org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void de.roppelt.Application.listen(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: Thrown at 2020-11-20T23:05:57.769425100, exhausted=false]
2020-11-20 23:06:07.897  INFO 22608 --- [  kgh1235-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-kgh1235-1, groupId=kgh1235] Seeking to offset 21 for partition kgh1235-0
2020-11-20 23:06:07.897 ERROR 22608 --- [  kgh1235-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

我认为这是一个错误,但在提交 github 问题之前,我想在这里 post 推迟。

日志由容器错误处理程序退出后写入(我们别无选择)。

但是,您可以通过更改 SeekToCurrentErrorHandler 上的日志级别来抑制这些日志。它设置异常的级别,容器将在该级别记录它。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    Assert.notNull(logLevel, "'logLevel' cannot be null");
    this.logLevel = logLevel;
}

这从 2.5 开始可用。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers

Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation). By default, such exceptions are logged by the container at ERROR level. Starting with version 2.5, all the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged.

如果您想在回退之前记录根本原因(原始异常),您可以在调用 super.handle(...).

之前将错误处理程序和日志子类化

如果您想要框架的一个选项来记录错误,在退出之前,请随意打开一个 GitHub 问题。

根据 Gary 的 ansere,我想到了以下内容。

它会根据一些上下文(尝试、主题、偏移量)立即记录异常。这有点类似于 retryTemplate 中的 LoggingListener 提供的内容。

public class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

    public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
        super(handler, backOff);
    }

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        ConsumerRecord<?, ?> record = records.get(0);
        TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
        String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? ""+deliveryAttempt(recordPosition) : "NA";
        log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

        super.handle(thrownException, records, consumer, container);
    }
}

添加到工厂:

factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
errorHandler.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace

结果样本:

@Slf4j
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(id = "so64937593", topics = "so64937593")
    public void listen(String in) {
        log.info("Processing {}", in);
        throw new RuntimeException("Thrown at " + LocalDateTime.now());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setDeliveryAttemptHeader(true); // optional, see https://docs.spring.io/spring-kafka/reference/html/#delivery-header
        SeekToCurrentErrorHandler eh = new LoggingSeekToCurrentErrorHandler((rec,ex) -> log.error("I am the dlq"), new FixedBackOff(10_000, Long.MAX_VALUE));
        eh.setLogLevel(KafkaException.Level.DEBUG); // since our handler already logs. Prevent double logging of stacktrace
        factory.setErrorHandler(eh);
        return factory;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so64937593", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send("so64937593", "foo" + i));
        };
    }


    public static class LoggingSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {

        public LoggingSeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> handler, BackOff backOff) {
            super(handler, backOff);
        }

        @Override
        public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
            ConsumerRecord<?, ?> record = records.get(0);
            TopicPartitionOffset recordPosition = new TopicPartitionOffset(record.topic(), record.partition(), record.offset());
            String delivery = container.getContainerProperties().isDeliveryAttemptHeader() ? ""+deliveryAttempt(recordPosition) : "NA";
            log.error("Listener threw an exception, attempt {}. On partition {} at offset {}. Record timestamp {}", delivery, recordPosition.getTopicPartition(), recordPosition.getOffset(), record.timestamp(), thrownException);

            super.handle(thrownException, records, consumer, container);
        }
    }
}