spring-kafka错误处理器调用次数过多

spring-kafka error handler invoked too many times

我正在使用 spring 的 SeekToCurrentErrorHandlerDeadLetterPublishingRecoverer
每次调用错误处理程序时都会记录一条消息,并且还会将一个事件发送到我们的监控系统。
我看到的问题是错误处理程序被调用太多次导致失败的连续记录。
例如,对于不可重试的异常,如果我产生 2 个(错误)消息,我会得到以下日志:

|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42  [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42  [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication

对于 x 个消费者错误消息,我似乎得到了:x 对错误处理程序的调用、x-1 次对错误处理程序的调用、x-2 次调用等。

可重试异常也是如此,我看到每次重试都一样。 Consumer函数调用正确,只是error handler触发次数过多

这是我的错误处理配置:

public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
    private final Monitor monitor;

    CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
        super(dlpr, retries);
        super.setLogLevel(KafkaException.Level.DEBUG);
        this.monitor = monitor;
    }

    @Override
    public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        if (!records.isEmpty()) {
            records.forEach(record -> {
                logAndReportError(record);
            });
        }
        super.handle(exception, records, consumer, container);
    }
}

@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
    var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
   seekToCurrent.addNotRetryableException(SomeFatalException.class);
   return seekToCurrent;
}
    

我有两个问题:

  1. 为什么错误处理程序被触发了这么多次?
  2. 为什么要针对不可重试的异常执行搜索?
  1. 你的问题不清楚;请添加您的代码和配置。

  2. 我们必须在失败的记录之后查找记录,以便在下一次轮询时重新传送它们。