回复 Kafka模板-异常处理

Reply Kafka Template - Exception handling

我正在使用 ReplyingKafkaTemplate 在两个微服务之间建立同步调用。 事件的接收者用SendTo注释如下:

@KafkaListener(topics = "${kafka.topic.prefix}"
        + "${kafka.topic.name}", containerFactory = "customEventKafkaListenerFactory")
@SendTo
public CustomResponseEvent onMessage(
        @Payload @Valid CustomRequestEvent event, @Header(KafkaHeaders.CORRELATION_ID) String correlationId,
        @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {

   //Making some REST API calls to another external system here using RestTemplate
}

REST API 调用可以抛出 4xx 或 5xx。有多个这样的调用,一些是对内部系统的,一些是对外部系统的。这可能是一个糟糕的设计,但我们不要深入探讨。

我想要 RestTemplate 的全局异常处理程序,我可以在其中捕获所有异常,然后 return 对事件的原始发送者的响应。 我使用与消费者收到的相同 replyTopiccorrelationId 来发布事件。 但是响应的接收者仍然抛出 No pending reply 异常。

  1. 无论我上面有什么方法,是否可以实现这样一个中央错误响应事件发布者?
  2. 是否有其他最适合此异常处理的替代方法?

@KafkaListener 附带:

/**
 * Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
 * name to invoke if the listener method throws an exception.
 * @return the error handler.
 * @since 1.3
 */
String errorHandler() default "";

那个用于捕获和处理所有下游异常,如果它 returns 一个结果,它被发送回 replyTopic:

public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
    logger.debug(() -> "Processing [" + message + "]");
    try {
        Object result = invokeHandler(record, acknowledgment, message, consumer);
        if (result != null) {
            handleResult(result, record, message);
        }
    }
    catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
        if (this.errorHandler != null) {
            try {
                Object result = this.errorHandler.handleError(message, e, consumer);
                if (result != null) {
                    handleResult(result, record, message);
                }
            }
            catch (Exception ex) {
                throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
                        "Listener error handler threw an exception for the incoming message",
                        message.getPayload()), ex);
            }
        }
        else {
            throw e;
        }
    }

有关详细信息,请参阅 RecordMessagingMessageListenerAdapter 源代码。