聚合后未调用配置的 errorChannel
Configured errorChannel not called after aggregation
我们在集成流程中遇到了一个奇怪的行为,即 errorChannel 不会收到消息,以防在聚合 之后的步骤中抛出异常。
这是(减少的)流量:
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlows.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getInputTopic().getName())
.errorChannel(errorHandler.getInputChannel())
)
.channel(nextChannel().getInputChannel())
.get();
}
@Bean
public IntegrationFlow nextChannel() {
return IntegrationFlows.from("next")
.transform(Transformers.fromJson(MyObject.class)) // An exception here is sent to errorChannel
.aggregate(aggregatorSpec ->
aggregatorSpec
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeout(2000L)
.expireGroupsUponCompletion(true)
.correlationStrategy(message -> KafkaHeaderUtils.getOrDefault(message.getHeaders(), MY_CORRELATION_HEADER, ""))
)
.transform(myObjectTransformer) // Exception here is not sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
.get();
}
如果我们添加一个不是 DirectChannel 类型的显式通道,错误处理将按预期工作。工作代码如下:
// ...
.aggregate(aggregatorSpec -> ...)
.channel(MessageChannels.queue())
.transform(myObjectTransformer) // Now the exception is sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
// ...
我们还想提一下,我们有一个非常相似的流程,其中 errorHandling 按预期工作(异常发送到 errorChannel)
所以我们实际上能够获得代码 运行,但是由于错误处理是应用程序中非常关键的部分,我们真的很想了解如何确保每个错误都将发送到配置的通道以及为什么显式设置 QueueChannel 会导致所需的行为。
提前致谢
你可以添加这个
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel()))
在聚合器之前。
这里的 .channel(MessageChannels.queue())
是误导性的,因为错误被发送到全局 errorChannel
,这显然与你的相同 errorHandler.getInputChannel()
.
.groupTimeout(2000L)
的问题是在单独的 TaskScheduler
线程上完成的,当下游发生错误时,Kafka.messageDrivenChannelAdapter
中没有关于 try..catch
的知识。
欢迎提出 GH 问题,因此我们会考虑将 errorChannel
填充到来自 MessageProducerSupport
的消息 headers 中,例如 Kafka.messageDrivenChannelAdapter
。因此,无论下游流的异步性质如何,错误处理都是相同的。
更新
请尝试将此作为解决方案:
.transform(Transformers.fromJson(MyDataObject.class)) // An exception here is sent to errorChannel
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel())))
.aggregate(aggregatorSpec ->
enrichHeaders()
应该可以确定发送错误的正确错误通道。
加上你的 MyDataObjectTransformer
必须修改为:
throw new MessageTransformationException(source, "test");
关键是端点捕获异常时有这样的逻辑:
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
其中:
if (!(ex instanceof MessagingException) ||
((MessagingException) ex).getFailedMessage() == null) {
runtimeException = new MessageDeliveryException(message, text.get(), ex);
}
然后在 AbstractCorrelatingMessageHandler
:
catch (MessageDeliveryException ex) {
logger.warn(ex, () ->
"The MessageGroup [" + groupId +
"] is rescheduled by the reason of: ");
scheduleGroupToForceComplete(groupId);
}
这就是你的异常没有到达错误通道的原因。
您可以考虑不使用那个 MessageTransformationException
。包装处理程序中的逻辑是这样的:
protected Object handleRequestMessage(Message<?> message) {
try {
return this.transformer.transform(message);
}
catch (Exception e) {
if (e instanceof MessageTransformationException) { // NOSONAR
throw (MessageTransformationException) e;
}
throw new MessageTransformationException(message, "Failed to transform Message in " + this, e);
}
}
更新 2
好的。我看到您使用 Spring Boot,并且没有将相应的 ErrorHandler
注册到聚合器中用于组超时功能的 TaskScheduler
。
请考虑将此 bean 添加到您的配置中:
@Bean
TaskSchedulerCustomizer taskSchedulerCustomizer(ErrorHandler integrationMessagePublishingErrorHandler) {
return taskScheduler -> taskScheduler.setErrorHandler(integrationMessagePublishingErrorHandler);
}
然后随时为 Spring 引导提出 GH 问题,使此自定义成为 auto-configuration 中的默认设置。
我们在集成流程中遇到了一个奇怪的行为,即 errorChannel 不会收到消息,以防在聚合 之后的步骤中抛出异常。
这是(减少的)流量:
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlows.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getInputTopic().getName())
.errorChannel(errorHandler.getInputChannel())
)
.channel(nextChannel().getInputChannel())
.get();
}
@Bean
public IntegrationFlow nextChannel() {
return IntegrationFlows.from("next")
.transform(Transformers.fromJson(MyObject.class)) // An exception here is sent to errorChannel
.aggregate(aggregatorSpec ->
aggregatorSpec
.releaseStrategy(new MessageCountReleaseStrategy(100))
.sendPartialResultOnExpiry(true)
.groupTimeout(2000L)
.expireGroupsUponCompletion(true)
.correlationStrategy(message -> KafkaHeaderUtils.getOrDefault(message.getHeaders(), MY_CORRELATION_HEADER, ""))
)
.transform(myObjectTransformer) // Exception here is not sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
.get();
}
如果我们添加一个不是 DirectChannel 类型的显式通道,错误处理将按预期工作。工作代码如下:
// ...
.aggregate(aggregatorSpec -> ...)
.channel(MessageChannels.queue())
.transform(myObjectTransformer) // Now the exception is sent to errorChannel
.channel(acknowledgeMyObjectFlow().getInputChannel())
// ...
我们还想提一下,我们有一个非常相似的流程,其中 errorHandling 按预期工作(异常发送到 errorChannel)
所以我们实际上能够获得代码 运行,但是由于错误处理是应用程序中非常关键的部分,我们真的很想了解如何确保每个错误都将发送到配置的通道以及为什么显式设置 QueueChannel 会导致所需的行为。
提前致谢
你可以添加这个
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel()))
在聚合器之前。
这里的 .channel(MessageChannels.queue())
是误导性的,因为错误被发送到全局 errorChannel
,这显然与你的相同 errorHandler.getInputChannel()
.
.groupTimeout(2000L)
的问题是在单独的 TaskScheduler
线程上完成的,当下游发生错误时,Kafka.messageDrivenChannelAdapter
中没有关于 try..catch
的知识。
欢迎提出 GH 问题,因此我们会考虑将 errorChannel
填充到来自 MessageProducerSupport
的消息 headers 中,例如 Kafka.messageDrivenChannelAdapter
。因此,无论下游流的异步性质如何,错误处理都是相同的。
更新
请尝试将此作为解决方案:
.transform(Transformers.fromJson(MyDataObject.class)) // An exception here is sent to errorChannel
.enrichHeaders(headers -> headers.header(MessageHeaders.ERROR_CHANNEL, (errorHandler.getInputChannel())))
.aggregate(aggregatorSpec ->
enrichHeaders()
应该可以确定发送错误的正确错误通道。
加上你的 MyDataObjectTransformer
必须修改为:
throw new MessageTransformationException(source, "test");
关键是端点捕获异常时有这样的逻辑:
if (handler != null) {
try {
handler.handleMessage(message);
return true;
}
catch (Exception e) {
throw IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
}
}
其中:
if (!(ex instanceof MessagingException) ||
((MessagingException) ex).getFailedMessage() == null) {
runtimeException = new MessageDeliveryException(message, text.get(), ex);
}
然后在 AbstractCorrelatingMessageHandler
:
catch (MessageDeliveryException ex) {
logger.warn(ex, () ->
"The MessageGroup [" + groupId +
"] is rescheduled by the reason of: ");
scheduleGroupToForceComplete(groupId);
}
这就是你的异常没有到达错误通道的原因。
您可以考虑不使用那个 MessageTransformationException
。包装处理程序中的逻辑是这样的:
protected Object handleRequestMessage(Message<?> message) {
try {
return this.transformer.transform(message);
}
catch (Exception e) {
if (e instanceof MessageTransformationException) { // NOSONAR
throw (MessageTransformationException) e;
}
throw new MessageTransformationException(message, "Failed to transform Message in " + this, e);
}
}
更新 2
好的。我看到您使用 Spring Boot,并且没有将相应的 ErrorHandler
注册到聚合器中用于组超时功能的 TaskScheduler
。
请考虑将此 bean 添加到您的配置中:
@Bean
TaskSchedulerCustomizer taskSchedulerCustomizer(ErrorHandler integrationMessagePublishingErrorHandler) {
return taskScheduler -> taskScheduler.setErrorHandler(integrationMessagePublishingErrorHandler);
}
然后随时为 Spring 引导提出 GH 问题,使此自定义成为 auto-configuration 中的默认设置。