spring-kafka错误处理器调用次数过多
spring-kafka error handler invoked too many times
我正在使用 spring 的 SeekToCurrentErrorHandler
和 DeadLetterPublishingRecoverer
。
每次调用错误处理程序时都会记录一条消息,并且还会将一个事件发送到我们的监控系统。
我看到的问题是错误处理程序被调用太多次导致失败的连续记录。
例如,对于不可重试的异常,如果我产生 2 个(错误)消息,我会得到以下日志:
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
对于 x 个消费者错误消息,我似乎得到了:x 对错误处理程序的调用、x-1 次对错误处理程序的调用、x-2 次调用等。
可重试异常也是如此,我看到每次重试都一样。
Consumer函数调用正确,只是error handler触发次数过多
这是我的错误处理配置:
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final Monitor monitor;
CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
super(dlpr, retries);
super.setLogLevel(KafkaException.Level.DEBUG);
this.monitor = monitor;
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
records.forEach(record -> {
logAndReportError(record);
});
}
super.handle(exception, records, consumer, container);
}
}
@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
seekToCurrent.addNotRetryableException(SomeFatalException.class);
return seekToCurrent;
}
我有两个问题:
- 为什么错误处理程序被触发了这么多次?
- 为什么要针对不可重试的异常执行搜索?
你的问题不清楚;请添加您的代码和配置。
我们必须在失败的记录之后查找记录,以便在下一次轮询时重新传送它们。
我正在使用 spring 的 SeekToCurrentErrorHandler
和 DeadLetterPublishingRecoverer
。
每次调用错误处理程序时都会记录一条消息,并且还会将一个事件发送到我们的监控系统。
我看到的问题是错误处理程序被调用太多次导致失败的连续记录。
例如,对于不可重试的异常,如果我产生 2 个(错误)消息,我会得到以下日志:
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error log
|ERROR|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| <== my error log
|INFO|Y|147e885e-9cce-4d10-972f-96613757c511|2020-10-22 13:55:41 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] o.a.k.c.c.KafkaConsumer:1603|orgTest|projectTest|dev|input| [Consumer clientId=consumer-debug-2, groupId=debug] Seeking to offset 5 for partition debug-2
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
|ERROR|Y|f0ecbdfa-3a13-4435-9cd1-b3daf73d324d|2020-10-22 13:55:42 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] Kafka:65|orgTest|projectTest|dev|input| my error logs
DEBUG|||2020-10-22 13:55:42 [kafka-producer-network-thread | producer-1] o.s.k.l.DeadLetterPublishingRecoverer:296||||| Successful dead-letter publication
对于 x 个消费者错误消息,我似乎得到了:x 对错误处理程序的调用、x-1 次对错误处理程序的调用、x-2 次调用等。
可重试异常也是如此,我看到每次重试都一样。 Consumer函数调用正确,只是error handler触发次数过多
这是我的错误处理配置:
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private final Monitor monitor;
CustomSeekToCurrentErrorHandler(Monitor monitor, DeadLetterPublishingRecoverer dlpr, FixedBackOff retries) {
super(dlpr, retries);
super.setLogLevel(KafkaException.Level.DEBUG);
this.monitor = monitor;
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
if (!records.isEmpty()) {
records.forEach(record -> {
logAndReportError(record);
});
}
super.handle(exception, records, consumer, container);
}
}
@Bean
public SeekToCurrentErrorHandler replayDeadLetterErrorHandler(DeadLetterPublishingRecoverer dlpr, FixedBackOff fxboff) {
var seekToCurrent = new CustomSeekToCurrentErrorHandler(monitor, dlpr, fxboff);
seekToCurrent.addNotRetryableException(SomeFatalException.class);
return seekToCurrent;
}
我有两个问题:
- 为什么错误处理程序被触发了这么多次?
- 为什么要针对不可重试的异常执行搜索?
你的问题不清楚;请添加您的代码和配置。
我们必须在失败的记录之后查找记录,以便在下一次轮询时重新传送它们。