记录 spring kafka listener 抛出的异常
Log the exceptions thrown in spring kafka listener
我们正在使用 Spring kafka 2.7 非阻塞重试机制。我们想记录在我们的@KafkaListener 方法中使用数据时抛出的错误。
在上面的例子中我们可以看到,有一个RuntimeException 被抛出。但是该异常不会被记录,相反我们会得到 Seek to current after exception .....
// our configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
IllegalAccessException.class);
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod(XYZ.class, "xyz")
.exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
.maxAttempts(retryAttempt)
.notRetryOn(throwableList)
.doNotAutoCreateRetryTopics()
.listenerFactory(kafkaListenerContainerFactory())
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}
您可以使用 RecordInterceptor
.
@Bean
RecordInterceptor<String, String> interceptor(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
RecordInterceptor<String, String> inter = new RecordInterceptor<String, String>() {
@Override
@Nullable
public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
return record;
}
@Override
public void failure(ConsumerRecord<String, String> record, Exception exception,
Consumer<String, String> consumer) {
logger.error("Record failed " + ListenerUtils.recordToString(record, true), exception);
}
};
factory.setRecordInterceptor(inter);
return inter;
}
我们正在使用 Spring kafka 2.7 非阻塞重试机制。我们想记录在我们的@KafkaListener 方法中使用数据时抛出的错误。
在上面的例子中我们可以看到,有一个RuntimeException 被抛出。但是该异常不会被记录,相反我们会得到 Seek to current after exception .....
// our configuration
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public RetryTopicConfiguration retryTopicConfiguration(KafkaTemplate<String, Object> template) {
List<Class<? extends Throwable>> throwableList = Arrays.asList(IllegalArgumentException.class,
IllegalAccessException.class);
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod(XYZ.class, "xyz")
.exponentialBackoff(delayMs, backoffMultiplier, maxIntervalInMs)
.maxAttempts(retryAttempt)
.notRetryOn(throwableList)
.doNotAutoCreateRetryTopics()
.listenerFactory(kafkaListenerContainerFactory())
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.create(template);
}
您可以使用 RecordInterceptor
.
@Bean
RecordInterceptor<String, String> interceptor(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
RecordInterceptor<String, String> inter = new RecordInterceptor<String, String>() {
@Override
@Nullable
public ConsumerRecord<String, String> intercept(ConsumerRecord<String, String> record) {
return record;
}
@Override
public void failure(ConsumerRecord<String, String> record, Exception exception,
Consumer<String, String> consumer) {
logger.error("Record failed " + ListenerUtils.recordToString(record, true), exception);
}
};
factory.setRecordInterceptor(inter);
return inter;
}