升级到 1.1.3 spring 集成 java dsl 后,错误消息从未传递到错误通道

Error messages never delivered to error channel after upgrade to 1.1.3 spring integration java dsl

我在我们的应用程序中使用了 spring 集成。最近我们尝试将我们的项目升级到 spring boot 1.4 release,它使用 spring java dsl 1.1.3。在最新版本中,他们弃用了 messageDriverChannelAdapter 并创建了一个名称为 messageDriverChannelAdapter 的新方法,即 'r' fixed.

集成流程在使用 java dsl 1.1.1 的 messageDriverChannelAdapter 时工作正常,但在失败情况下它在 1.1.3 中失败(即流程中发生异常)。

这是我的流程

IntegrationFlows
                .from(Jms.messageDriverChannelAdapter(listenerContainer()).errorChannel(errorChannel())
                        .outputChannel(listenerDirectChannel()))
                .channel(listenerDirectChannel()).transform(new JsonToChangeObjectTransformer())
                .channel(conversionOutChannel()).handle(CHANGED_OBJECT_LISTENER_IMPL, PROCESS_MESSAGE)
                .channel(errorChannel()).handle(FAILED_MESSAGE_HANDLER_IMPL, HANDLE_ERROR).get();

我什至试过路由

IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(listenerContainer())
                       .errorChannel(errorChannel())
                       .outputChannel(listenerDirectChannel()))
                       .channel(listenerDirectChannel())
                       .transform(new JsonToChangeObjectTransformer())
                       .<Object, Boolean> route(p -> p instanceof ChangedObject,
                        m -> m.channelMapping("true", "conversionOutChannel").channelMapping("false", "errorChannel"))
                         .channel(conversionOutChannel())
                       .handle(CHANGED_OBJECT_LISTENER_IMPL, PROCESS_MESSAGE).channel(errorChannel())
                       .handle(FAILED_MESSAGE_HANDLER_IMPL, HANDLE_ERROR).get();

但仍然没有运气

我的观察是消息从未传递到错误通道(正如我调试的那样)并且它一直试图将消息放入通道最终导致 Whosebugerror

异常:

org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:160)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:160)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)

IntegrarionFlow 定义不正确。 当您指定

.handle(CHANGED_OBJECT_LISTENER_IMPL, PROCESS_MESSAGE)
.channel(errorChannel())
.handle(FAILED_MESSAGE_HANDLER_IMPL, HANDLE_ERROR)

这意味着第一个 .handle() 的输出将被发送到 errorChannel。 好的结果是错误的过程。不好...

不确定这是不是您所期望的。

尝试分离错误流。例如:

@Bean
    public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
                        .errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                        .destination("jmsMessageDriverRedelivery"))
                .<String, String>transform(p -> {
                    throw new RuntimeException("intentional");
                })
                .get();
    }

    @Bean
    public CountDownLatch redeliveryLatch() {
        return new CountDownLatch(3);
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                .handle(m -> {
                    MessagingException exception = (MessagingException) m.getPayload();
                    redeliveryLatch().countDown();
                    throw exception;
                })
                .get();
    }

涉及:https://github.com/spring-projects/spring-integration-java-dsl/issues/101