我想通过注释启用 Kafka 错误处理

I want to enable Kafka errorhandling by annotation

我尝试通过注释启用 spring-kafka 错误处理。

所以我创建了一个 bean kafkaListenerErrorHandler

@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
    KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
        MessageHeaders headers = message.getHeaders();
        ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
                headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
                headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
                headers.get(KafkaHeaders.OFFSET, Long.class),
                headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
                (EventV1) message.getPayload());
        errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
        return null;
    };

    return kafkaListenerErrorHandler;
}

并通过

将其连接到KafkaListener
@Autowired(required = false)
MessageListenerContainer messageListenerContainer;

@Autowired
private UserBO userBO;

@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
    return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}

@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {

    log.info("Received Messasge: {}", message);
    try {
        EventV1 value = message.value();
        userBO.getUserById(value.getCustomerId());
    } catch (Exception e) {
        throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
    }
}

即使它看起来能工作,代码看起来也很难看。 kafkaListenerErrorHandler 构造一个新的 ConsumerRecord 以将其转发给 ErrorHandler 而 "messageListenerContainer" 为空,因为我没有找到如何将其放入我的上下文中。

应该或必须有一种更优雅的方式将 ErrorHandler 与 KafkaListenerErrorHandler 连接起来

我还在我的设置中添加了反序列化错误处理程序。

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2 spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages=* 感谢您的任何建议。

您不需要侦听器错误处理程序; spring 启动会自动将 errorHandler 连接到侦听器容器,如果侦听器抛出异常,它将从那里调用。

此外,您的操作是错误的,因为错误处理程序只会查找当前记录,并且可能在上次轮询中提取了更多记录。

KafkaListenerErrorHandler 位于堆栈的较高位置,用于更高级的错误处理。

例如,在 request/reply 场景中,(侦听器方法 return 是一个回复值),侦听器错误处理程序可以 return 向请求的发送者发送错误回复消息。

编辑

异常可以分类为"not retryable"。

参见 the documentation

Starting with version 2.3, the SeekToCurrentErrorHandler considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are:

  • DeserializationException
  • MessageConversionException
  • MethodArgumentResolutionException
  • NoSuchMethodException
  • ClassCastException

since these exceptions are unlikely to be resolved on a retried delivery.

You can add more exception types to the not-retryable category, or completely replace the BinaryExceptionClassifier with your own configured classifier. See the Javadocs for SeekToCurrentErrorHandler for more information, as well as those for the spring-retry BinaryExceptionClassifier.

Here is an example that adds IllegalArgumentException to the not-retryable exceptions:

@Bean
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
    handler.addNotRetryableException(IllegalArgumentException.class);
    return handler;
}