Spring 集成 Java DSL:如何在出现错误情况后继续使用拆分和聚合方法?
Spring Integration Java DSL: How to continue after error situation with the split and the aggregate methods?
我的程序在高级中执行以下操作
Task 1
get the data from the System X
the Java DSL split
post the data to the System Y
post the reply data to the X
the Java DSL aggregate
Task 2
get the data from the System X
the Java DSL split
post the data to the System Y
post the reply data to the X
the Java DSL aggregate
...
问题是,当一个post the data to the System Y
子任务失败时,错误消息被正确地发送回System X,但之后任何其他子任务或任务都不会被执行。
我的错误处理程序是这样做的:
...
Message<String> newMessage = MessageBuilder.withPayload("error occurred")
.copyHeadersIfAbsent(message.getPayload().getFailedMessage().getHeaders()).build();
...
Set some extra headers etc.
...
return newMessage;
可能是什么问题?
编辑:
我调试了 Spring 集成。在错误情况下,只有第一条错误消息到达方法 AbstractCorrelatingMessageHandler.handleMessageInternal
。该方法不会出现其他成功和失败的消息。
如果没有错误,所有消息都会到达该方法,最后该组被释放。
我的程序可能有什么问题?
编辑 2:
这是有效的:
为 Http.outboundGateway
添加了 advice
:
.handle(Http.outboundGateway(...,
c -> c.advice(myAdvice()))
和 myAdvice
bean
@Bean
private Advice myAdvice() {
return new MyAdvice();
}
和 MyAdvice
class
public class MyAdvice<T> extends AbstractRequestHandlerAdvice {
@SuppressWarnings("unchecked")
@Override
protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
throws Exception {
...
try {
result = (MessageBuilder<T>) callback.execute();
} catch (final MessageHandlingException e) {
take the exception cause for the new payload
}
return new message with the old headers and replyChannel header and result.payload or the exception cause as a payload
}
}
你的程序没有问题。这正是 Java 中常规循环的工作方式。要捕获每次迭代的异常并继续其他剩余项目,您肯定需要 Java 循环中的 try..catch
。因此,您需要在此处为分离器应用类似的东西。它可以通过 ExpressionEvaluatingRequestHandlerAdvice
、ExectutorChannel
作为分离器的输出或通过分离器输出通道上的服务激活器调用网关来实现。
由于之后的故事是关于聚合器的,您仍然需要以某种方式完成一个组,这只能通过从错误处理中发出一些错误补偿消息来完成到 return 回到聚合器的输入通道。在这种情况下,您需要确保将请求 headers 从 MessagingException
的 failedMessage
复制到错误流中。组聚合后,您需要切断来自正常消息的错误消息。这只能使用特殊有效负载来完成,或者您可能只是将异常作为有效负载,以便在聚合器的最终结果中正确区分正常消息中的错误。
我的程序在高级中执行以下操作
Task 1
get the data from the System X
the Java DSL split
post the data to the System Y
post the reply data to the X
the Java DSL aggregate
Task 2
get the data from the System X
the Java DSL split
post the data to the System Y
post the reply data to the X
the Java DSL aggregate
...
问题是,当一个post the data to the System Y
子任务失败时,错误消息被正确地发送回System X,但之后任何其他子任务或任务都不会被执行。
我的错误处理程序是这样做的:
...
Message<String> newMessage = MessageBuilder.withPayload("error occurred")
.copyHeadersIfAbsent(message.getPayload().getFailedMessage().getHeaders()).build();
...
Set some extra headers etc.
...
return newMessage;
可能是什么问题?
编辑:
我调试了 Spring 集成。在错误情况下,只有第一条错误消息到达方法 AbstractCorrelatingMessageHandler.handleMessageInternal
。该方法不会出现其他成功和失败的消息。
如果没有错误,所有消息都会到达该方法,最后该组被释放。
我的程序可能有什么问题?
编辑 2:
这是有效的:
为 Http.outboundGateway
添加了 advice
:
.handle(Http.outboundGateway(...,
c -> c.advice(myAdvice()))
和 myAdvice
bean
@Bean
private Advice myAdvice() {
return new MyAdvice();
}
和 MyAdvice
class
public class MyAdvice<T> extends AbstractRequestHandlerAdvice {
@SuppressWarnings("unchecked")
@Override
protected Object doInvoke(final ExecutionCallback callback, final Object target, final Message<?> message)
throws Exception {
...
try {
result = (MessageBuilder<T>) callback.execute();
} catch (final MessageHandlingException e) {
take the exception cause for the new payload
}
return new message with the old headers and replyChannel header and result.payload or the exception cause as a payload
}
}
你的程序没有问题。这正是 Java 中常规循环的工作方式。要捕获每次迭代的异常并继续其他剩余项目,您肯定需要 Java 循环中的 try..catch
。因此,您需要在此处为分离器应用类似的东西。它可以通过 ExpressionEvaluatingRequestHandlerAdvice
、ExectutorChannel
作为分离器的输出或通过分离器输出通道上的服务激活器调用网关来实现。
由于之后的故事是关于聚合器的,您仍然需要以某种方式完成一个组,这只能通过从错误处理中发出一些错误补偿消息来完成到 return 回到聚合器的输入通道。在这种情况下,您需要确保将请求 headers 从 MessagingException
的 failedMessage
复制到错误流中。组聚合后,您需要切断来自正常消息的错误消息。这只能使用特殊有效负载来完成,或者您可能只是将异常作为有效负载,以便在聚合器的最终结果中正确区分正常消息中的错误。