回复 Kafka模板-异常处理
Reply Kafka Template - Exception handling
我正在使用 ReplyingKafkaTemplate
在两个微服务之间建立同步调用。
事件的接收者用SendTo
注释如下:
@KafkaListener(topics = "${kafka.topic.prefix}"
+ "${kafka.topic.name}", containerFactory = "customEventKafkaListenerFactory")
@SendTo
public CustomResponseEvent onMessage(
@Payload @Valid CustomRequestEvent event, @Header(KafkaHeaders.CORRELATION_ID) String correlationId,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {
//Making some REST API calls to another external system here using RestTemplate
}
REST API 调用可以抛出 4xx 或 5xx。有多个这样的调用,一些是对内部系统的,一些是对外部系统的。这可能是一个糟糕的设计,但我们不要深入探讨。
我想要 RestTemplate
的全局异常处理程序,我可以在其中捕获所有异常,然后 return 对事件的原始发送者的响应。
我使用与消费者收到的相同 replyTopic
和 correlationId
来发布事件。
但是响应的接收者仍然抛出 No pending reply
异常。
- 无论我上面有什么方法,是否可以实现这样一个中央错误响应事件发布者?
- 是否有其他最适合此异常处理的替代方法?
@KafkaListener
附带:
/**
* Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
* name to invoke if the listener method throws an exception.
* @return the error handler.
* @since 1.3
*/
String errorHandler() default "";
那个用于捕获和处理所有下游异常,如果它 returns 一个结果,它被发送回 replyTopic
:
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
logger.debug(() -> "Processing [" + message + "]");
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
Object result = this.errorHandler.handleError(message, e, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}
有关详细信息,请参阅 RecordMessagingMessageListenerAdapter
源代码。
我正在使用 ReplyingKafkaTemplate
在两个微服务之间建立同步调用。
事件的接收者用SendTo
注释如下:
@KafkaListener(topics = "${kafka.topic.prefix}"
+ "${kafka.topic.name}", containerFactory = "customEventKafkaListenerFactory")
@SendTo
public CustomResponseEvent onMessage(
@Payload @Valid CustomRequestEvent event, @Header(KafkaHeaders.CORRELATION_ID) String correlationId,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {
//Making some REST API calls to another external system here using RestTemplate
}
REST API 调用可以抛出 4xx 或 5xx。有多个这样的调用,一些是对内部系统的,一些是对外部系统的。这可能是一个糟糕的设计,但我们不要深入探讨。
我想要 RestTemplate
的全局异常处理程序,我可以在其中捕获所有异常,然后 return 对事件的原始发送者的响应。
我使用与消费者收到的相同 replyTopic
和 correlationId
来发布事件。
但是响应的接收者仍然抛出 No pending reply
异常。
- 无论我上面有什么方法,是否可以实现这样一个中央错误响应事件发布者?
- 是否有其他最适合此异常处理的替代方法?
@KafkaListener
附带:
/**
* Set an {@link org.springframework.kafka.listener.KafkaListenerErrorHandler} bean
* name to invoke if the listener method throws an exception.
* @return the error handler.
* @since 1.3
*/
String errorHandler() default "";
那个用于捕获和处理所有下游异常,如果它 returns 一个结果,它被发送回 replyTopic
:
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
logger.debug(() -> "Processing [" + message + "]");
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
if (this.errorHandler != null) {
try {
Object result = this.errorHandler.handleError(message, e, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
catch (Exception ex) {
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
"Listener error handler threw an exception for the incoming message",
message.getPayload()), ex);
}
}
else {
throw e;
}
}
有关详细信息,请参阅 RecordMessagingMessageListenerAdapter
源代码。