AMQP Spring 集成错误处理
AMQP Spring Integration error handling
我的集成流程如下所示:
@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("writeChannel") MessageChannel messageChannel,
@Qualifier("parseErrorChannel") MessageChannel errorChannel) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, auditQueue)
.errorChannel(errorChannel)
.concurrentConsumers(numConsumers)
.messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
.enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
o -> getNamespace(o.getPayload())))
.channel(messageChannel)
.get();
}
如果引入格式错误的消息,消息转换器当然会出错,从而引发 MessageConversionException。在这种情况下,我当然不希望消息重新排队——但我也不想将默认设置为不重新排队被拒绝的消息,就像我在 AmqpInboundAdapterSpec 上所做的那样。什么是我不重新排队以这种方式出错的消息(并以其他方式重新发布它们以进行调试)的正确方法?
更一般地说,同一流中的下游进程可能会因语义格式更错误的数据而出错——再一次,我不想对它们重新排队。那时我可以抛出一个 AmqpRejectAndDontRequeueException
,但随后我失去了关注点分离,这是重点的一半。对这些异常采取行动的正确方法是什么——也许有一种方法可以转化为 AmqpRejectAndDontRequeueException
?
入站适配器的 SimpleMessageListenerContainer
(ConditionalRejectingErrorHandler
)中的默认错误处理程序就是这样做的(它检测到 MessageConversionException
并抛出 AmqpRejectAndDontRequeueException
)。
您可以通过注入自己的 FatalExceptionStrategy
来自定义 ConditionalRejectingErrorHandler
以查找其他异常类型并以相同的方式处理它们。
DefaultExceptionStrategy
看起来像这样...
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException
&& t.getCause() instanceof MessageConversionException) {
if (logger.isWarnEnabled()) {
logger.warn("Fatal message conversion error; message rejected; "
+ "it will be dropped or routed to a dead letter exchange, if so configured: "
+ ((ListenerExecutionFailedException) t).getFailedMessage(), t);
}
return true;
}
return false;
}
只有当原因链中还没有 AmqpRejectAndDontRequeueException
时才会调用它。
我的集成流程如下所示:
@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("writeChannel") MessageChannel messageChannel,
@Qualifier("parseErrorChannel") MessageChannel errorChannel) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, auditQueue)
.errorChannel(errorChannel)
.concurrentConsumers(numConsumers)
.messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
.enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
o -> getNamespace(o.getPayload())))
.channel(messageChannel)
.get();
}
如果引入格式错误的消息,消息转换器当然会出错,从而引发 MessageConversionException。在这种情况下,我当然不希望消息重新排队——但我也不想将默认设置为不重新排队被拒绝的消息,就像我在 AmqpInboundAdapterSpec 上所做的那样。什么是我不重新排队以这种方式出错的消息(并以其他方式重新发布它们以进行调试)的正确方法?
更一般地说,同一流中的下游进程可能会因语义格式更错误的数据而出错——再一次,我不想对它们重新排队。那时我可以抛出一个 AmqpRejectAndDontRequeueException
,但随后我失去了关注点分离,这是重点的一半。对这些异常采取行动的正确方法是什么——也许有一种方法可以转化为 AmqpRejectAndDontRequeueException
?
入站适配器的 SimpleMessageListenerContainer
(ConditionalRejectingErrorHandler
)中的默认错误处理程序就是这样做的(它检测到 MessageConversionException
并抛出 AmqpRejectAndDontRequeueException
)。
您可以通过注入自己的 FatalExceptionStrategy
来自定义 ConditionalRejectingErrorHandler
以查找其他异常类型并以相同的方式处理它们。
DefaultExceptionStrategy
看起来像这样...
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException
&& t.getCause() instanceof MessageConversionException) {
if (logger.isWarnEnabled()) {
logger.warn("Fatal message conversion error; message rejected; "
+ "it will be dropped or routed to a dead letter exchange, if so configured: "
+ ((ListenerExecutionFailedException) t).getFailedMessage(), t);
}
return true;
}
return false;
}
只有当原因链中还没有 AmqpRejectAndDontRequeueException
时才会调用它。