Spring 集成:网关错误到消息的转换失败

Spring integration: failure of conversion of gateway error to message

我希望我的服务网关return以同步方式将其即时流(直到最近的异步通道)中的任何错误作为有效消息(不例外)发送给服务调用者。

整个流程都在一个线程上。我的最小示例拒绝按预期工作,我无法找到使用框架实现目标的正确方法。请看下面的代码。

@RunWith(SpringRunner.class)
public class ErrorHandlingTests {

    @Autowired
    ErrorsHandlingService errorsHandlingService;

    @EnableIntegration
    @Configuration
    static class Config {

        @Bean
        IntegrationFlow errorHandler() {
            return IntegrationFlows.from("errorChannel").
                    handle(errorMessage -> {
                        Message<?> failedMessage = ((MessagingException) errorMessage.getPayload()).getFailedMessage();
                        MessageChannel replyChannel = (MessageChannel) failedMessage.getHeaders().getReplyChannel();
                        replyChannel.send(new GenericMessage<>("Failure for " + failedMessage.getPayload()));
                    })
                    .get();
        }

        @Bean
        IntegrationFlow errorsHandlingFlow1() {
            return IntegrationFlows.from(ErrorsHandlingService.class, gws -> gws.errorChannel("errorChannel"))
                    .transform(new AbstractPayloadTransformer<String, String>() {
                        @Override
                        protected String transformPayload(String s) {
                            if (s.contains("oops"))
                                throw new IllegalArgumentException("Bad value");
                            return "R: " + s;
                        }
                    })
                    .get();
        }
    }


    @Test
    public void testErrorHandlingInFlow1() {
        assertEquals("R: a", errorsHandlingService.process("a"));
        assertEquals("Failure for oops", errorsHandlingService.process("oops"));
    }
}

此测试在第二个断言和日志打印时挂起:

W 210124 161538.597 [] [main] GenericMessagingTemplate$TemporaryReplyChannel - Reply message received but the receiving thread has exited due to an exception while sending the request message: GenericMessage [payload=Failure for oops, headers={id=57f79307-0778-88b4-9261-9040633cfc03, timestamp=1611494138597}]

所有这些都发生在最新的 5.3.4 版本上。

您不需要手动使用 replyChannel.send()。对于这样的错误处理和补偿回复生成你只需要这样做:

<MessagingException>handle((ex, h) -> "Failure for " + ex.getFailedMessage().getPayload())

网关中的逻辑是这样的:

  1. send-n-receive - 同步或异步 - 通过 replyChannel header.
  2. 如果前面的失败,捕获该异常并在 handleSendAndReceiveError()
  3. 中处理它
  4. 它采用为此网关配置的 errorChannel 并执行 ErrorMessage
  5. 的 send-n-receive
  6. sub-flow.
  7. 的回复应该是正常的

好吧,实际问题是您使用了 failedMessage 中的 replyChannel,这不再有效,因为主流已经失败。在这一点上,我们处理新的错误消息流,我们真的应该依赖它的 headers 了。或者让框架给我们做回复关联!