我想通过注释启用 Kafka 错误处理
I want to enable Kafka errorhandling by annotation
我尝试通过注释启用 spring-kafka 错误处理。
所以我创建了一个 bean kafkaListenerErrorHandler
。
@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
MessageHeaders headers = message.getHeaders();
ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
headers.get(KafkaHeaders.OFFSET, Long.class),
headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
(EventV1) message.getPayload());
errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
return null;
};
return kafkaListenerErrorHandler;
}
并通过
将其连接到KafkaListener
@Autowired(required = false)
MessageListenerContainer messageListenerContainer;
@Autowired
private UserBO userBO;
@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {
log.info("Received Messasge: {}", message);
try {
EventV1 value = message.value();
userBO.getUserById(value.getCustomerId());
} catch (Exception e) {
throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
}
}
即使它看起来能工作,代码看起来也很难看。 kafkaListenerErrorHandler
构造一个新的 ConsumerRecord 以将其转发给 ErrorHandler 而 "messageListenerContainer" 为空,因为我没有找到如何将其放入我的上下文中。
应该或必须有一种更优雅的方式将 ErrorHandler 与 KafkaListenerErrorHandler 连接起来
我还在我的设置中添加了反序列化错误处理程序。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
感谢您的任何建议。
您不需要侦听器错误处理程序; spring 启动会自动将 errorHandler
连接到侦听器容器,如果侦听器抛出异常,它将从那里调用。
此外,您的操作是错误的,因为错误处理程序只会查找当前记录,并且可能在上次轮询中提取了更多记录。
KafkaListenerErrorHandler
位于堆栈的较高位置,用于更高级的错误处理。
例如,在 request/reply 场景中,(侦听器方法 return 是一个回复值),侦听器错误处理程序可以 return 向请求的发送者发送错误回复消息。
编辑
异常可以分类为"not retryable"。
Starting with version 2.3, the SeekToCurrentErrorHandler
considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
The exceptions that are considered fatal, by default, are:
DeserializationException
MessageConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
since these exceptions are unlikely to be resolved on a retried delivery.
You can add more exception types to the not-retryable category, or completely replace the BinaryExceptionClassifier
with your own configured classifier.
See the Javadocs for SeekToCurrentErrorHandler
for more information, as well as those for the spring-retry
BinaryExceptionClassifier
.
Here is an example that adds IllegalArgumentException
to the not-retryable exceptions:
@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
handler.addNotRetryableException(IllegalArgumentException.class);
return handler;
}
我尝试通过注释启用 spring-kafka 错误处理。
所以我创建了一个 bean kafkaListenerErrorHandler
。
@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
MessageHeaders headers = message.getHeaders();
ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
headers.get(KafkaHeaders.OFFSET, Long.class),
headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
(EventV1) message.getPayload());
errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
return null;
};
return kafkaListenerErrorHandler;
}
并通过
将其连接到KafkaListener
@Autowired(required = false)
MessageListenerContainer messageListenerContainer;
@Autowired
private UserBO userBO;
@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {
log.info("Received Messasge: {}", message);
try {
EventV1 value = message.value();
userBO.getUserById(value.getCustomerId());
} catch (Exception e) {
throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
}
}
即使它看起来能工作,代码看起来也很难看。 kafkaListenerErrorHandler
构造一个新的 ConsumerRecord 以将其转发给 ErrorHandler 而 "messageListenerContainer" 为空,因为我没有找到如何将其放入我的上下文中。
应该或必须有一种更优雅的方式将 ErrorHandler 与 KafkaListenerErrorHandler 连接起来
我还在我的设置中添加了反序列化错误处理程序。
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
感谢您的任何建议。
您不需要侦听器错误处理程序; spring 启动会自动将 errorHandler
连接到侦听器容器,如果侦听器抛出异常,它将从那里调用。
此外,您的操作是错误的,因为错误处理程序只会查找当前记录,并且可能在上次轮询中提取了更多记录。
KafkaListenerErrorHandler
位于堆栈的较高位置,用于更高级的错误处理。
例如,在 request/reply 场景中,(侦听器方法 return 是一个回复值),侦听器错误处理程序可以 return 向请求的发送者发送错误回复消息。
编辑
异常可以分类为"not retryable"。
Starting with version 2.3, the
SeekToCurrentErrorHandler
considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:
DeserializationException
MessageConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
since these exceptions are unlikely to be resolved on a retried delivery.
You can add more exception types to the not-retryable category, or completely replace the
BinaryExceptionClassifier
with your own configured classifier. See the Javadocs forSeekToCurrentErrorHandler
for more information, as well as those for thespring-retry
BinaryExceptionClassifier
.Here is an example that adds
IllegalArgumentException
to the not-retryable exceptions:
@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
handler.addNotRetryableException(IllegalArgumentException.class);
return handler;
}