Spring 集成:网关错误到消息的转换失败
Spring integration: failure of conversion of gateway error to message
我希望我的服务网关return以同步方式将其即时流(直到最近的异步通道)中的任何错误作为有效消息(不例外)发送给服务调用者。
整个流程都在一个线程上。我的最小示例拒绝按预期工作,我无法找到使用框架实现目标的正确方法。请看下面的代码。
@RunWith(SpringRunner.class)
public class ErrorHandlingTests {
@Autowired
ErrorsHandlingService errorsHandlingService;
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel").
handle(errorMessage -> {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
})
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow1() {
return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
})
.get();
}
}
@Test
public void testErrorHandlingInFlow1() {
assertEquals("R: a", errorsHandlingService.process("a"));
assertEquals("Failure for oops", errorsHandlingService.process("oops"));
}
}
此测试在第二个断言和日志打印时挂起:
W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]
所有这些都发生在最新的 5.3.4 版本上。
您不需要手动使用 replyChannel.send()
。对于这样的错误处理和补偿回复生成你只需要这样做:
<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
网关中的逻辑是这样的:
- send-n-receive - 同步或异步 - 通过
replyChannel
header.
- 如果前面的失败,捕获该异常并在
handleSendAndReceiveError()
中处理它
- 它采用为此网关配置的
errorChannel
并执行 ErrorMessage
的 send-n-receive
- sub-flow.
的回复应该是正常的
好吧,实际问题是您使用了 failedMessage
中的 replyChannel
,这不再有效,因为主流已经失败。在这一点上,我们处理新的错误消息流,我们真的应该依赖它的 headers 了。或者让框架给我们做回复关联!
我希望我的服务网关return以同步方式将其即时流(直到最近的异步通道)中的任何错误作为有效消息(不例外)发送给服务调用者。
整个流程都在一个线程上。我的最小示例拒绝按预期工作,我无法找到使用框架实现目标的正确方法。请看下面的代码。
@RunWith(SpringRunner.class)
public class ErrorHandlingTests {
@Autowired
ErrorsHandlingService errorsHandlingService;
@EnableIntegration
@Configuration
static class Config {
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows.from("errorChannel").
handle(errorMessage -> {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
})
.get();
}
@Bean
IntegrationFlow errorsHandlingFlow1() {
return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
.transform(new AbstractPayloadTransformer<String, String>() {
@Override
protected String transformPayload(String s) {
if (s.contains("oops"))
throw new IllegalArgumentException("Bad value");
return "R: " + s;
}
})
.get();
}
}
@Test
public void testErrorHandlingInFlow1() {
assertEquals("R: a", errorsHandlingService.process("a"));
assertEquals("Failure for oops", errorsHandlingService.process("oops"));
}
}
此测试在第二个断言和日志打印时挂起:
W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]
所有这些都发生在最新的 5.3.4 版本上。
您不需要手动使用 replyChannel.send()
。对于这样的错误处理和补偿回复生成你只需要这样做:
<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())
网关中的逻辑是这样的:
- send-n-receive - 同步或异步 - 通过
replyChannel
header. - 如果前面的失败,捕获该异常并在
handleSendAndReceiveError()
中处理它
- 它采用为此网关配置的
errorChannel
并执行ErrorMessage
的 send-n-receive
- sub-flow. 的回复应该是正常的
好吧,实际问题是您使用了 failedMessage
中的 replyChannel
,这不再有效,因为主流已经失败。在这一点上,我们处理新的错误消息流,我们真的应该依赖它的 headers 了。或者让框架给我们做回复关联!