如何将原始消息存储到 JMS 队列而不是 org.springframework.messaging.MessageDeliveryException 形式

How to store original message into JMS Queue instead of org.springframework.messaging.MessageDeliveryException form

我是第一次使用Spring集成

场景:

  1. 从源 JMS 队列读取消息,然后推入目标 JMS 队列使用的通道。如果目标 Jndi 名称错误,重试机制将启动。
  2. 重试结束后,消息被推入异常 JMS 队列。
  3. 所有队列都在我的本地Weblogic 12c。该应用程序是 Spring Boot 2.1.10.RELEASE + Spring Integration 5.1.9.RELEASE
  4. 当目标JMS Queue 的Jndi 正确时,消息发送成功。这是有效的,我可以看到消息,因为我将提取有效负载设置为 true。

问题:

  1. 我可以在异常队列中看到该消息,但它的格式为 org.springframework.messaging.MessageDeliveryException,无法查看就像上面的第 4 点一样。是否可以将原始消息存储在异常队列中,而不是 org.springframework.messaging.MessageDeliveryException 形式?
  2. 如何使用下面的代码实现此目的?

代码:[更新解决方案]

@Configuration
@EnableRetry
@EnableIntegration
@Slf4j
public class JmsRoutingConfig {
    @Bean(name = "executor")
    ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        ...
        return threadPoolTaskExecutor;
    }

    public RequestHandlerRetryAdvice retryAdvice() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);

        ExponentialBackOffPolicy exponentialBackOff = new ExponentialBackOffPolicy();
        exponentialBackOff.setInitialInterval(5000);
        exponentialBackOff.setMultiplier(2);
        exponentialBackOff.setMaxInterval(30 * 1000);

        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setBackOffPolicy(exponentialBackOff);
        retryTemplate.setRetryPolicy(retryPolicy);

        RecoveryCallback<Object> recoveryCallBack = new ErrorMessageSendingRecoverer(exceptionQueueChannel());
        retryAdvice.setRecoveryCallback(recoveryCallBack);
        retryAdvice.setRetryTemplate(retryTemplate);
        return retryAdvice;
    }

    @Bean(name = "exceptionQueueChannel")
    public PublishSubscribeChannel exceptionQueueChannel() {
       return new PublishSubscribeChannel();
    }

    @Bean(name = "errorChannel")
    public PublishSubscribeChannel errorChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(errorChannel()).transform("payload.failedMessage").routeByException(r -> r.advice(retryAdvice())).get();
    }

    @Bean
    public IntegrationFlow exceptionHandlingFlow(@Qualifier("exceptionJmsTemplate") JmsTemplate exceptionJmsTemplate) {
    return IntegrationFlows.from(exceptionQueueChannel()).transform("payload.failedMessage").handle(Jms.outboundAdapter(exceptionJmsTemplate).extractPayload(true).get())
            .get();
    }

    @Bean(name = "multiSubcriptionChannel")
    public PublishSubscribeChannel multiSubcriptionChannel(@Qualifier("executor") ThreadPoolTaskExecutor executor) {
        return new PublishSubscribeChannel(executor);
    }

    @Bean(name = "headerValueRouter")
    public HeaderValueRouter headerValueRouter(@Qualifier("multiSubcriptionChannel") PublishSubscribeChannel multiSubcriptionChannel) {
        multiSubcriptionChannel.subscribe(Jms.outboundAdapter(getTargetJmsTemplate()).extractPayload(true).get());
        HeaderValueRouter router = new HeaderValueRouter("jms_correlationId");
        router.setChannelMapping("77", multiSubcriptionChannel.getBeanName());
        return router;
    }

    @Bean
    public IntegrationFlow inFlow(@Qualifier("headerValueRouter") HeaderValueRouter headerValueRouter) {
        return IntegrationFlows.from(jmsInboundChannel()).route(headerValueRouter).get();
    }

    @Bean(name = "listenerErrorChannel")
    public PublishSubscribeChannel listenerErrorChannel() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public IntegrationFlow listenerErrorHandlingFlow() {
        return IntegrationFlows.from(listenerErrorChannel())
                .handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
                        ((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
                        ((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
    }

    @Bean(name = "jmsInboundChannel")
    public JmsMessageDrivenEndpoint jmsInboundChannel() {

    return Jms.messageDrivenChannelAdapter(routingCachingJmsConnectionFactory()).
            configureListenerContainer(
            defaultMessageListenerContainerJmsListenerContainerSpec ->
            defaultMessageListenerContainerJmsListenerContainerSpec
                    .destinationResolver(routingJmsDestinationResolver()).acceptMessagesWhileStopping(false)
                    .sessionTransacted(true).sessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE)
                    .concurrency(routingSourceJmsLowerUpperConcurrentThreadCount).concurrentConsumers(3).maxConcurrentConsumers(3))
            .errorChannel(listenerErrorChannel()).shutdownContainerOnStop(false)
            .destination(routingSourceJmsQueue).autoStartup(true)
            .get();
    }
...
}

日志:

15:28:20.839 [DefaultMessageListenerContainer-3] DEBUG o.s.j.l.DefaultMessageListenerContainer - Received message of type [class weblogic.jms.common.TextMessageImpl] from consumer [weblogic.jms.client.WLConsumerImpl@12af28a8] of session [Cached JMS Session: weblogic.jms.client.WLSessionImpl@4e918fd1]
15:28:20.841 [DefaultMessageListenerContainer-3] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener - converted JMS Message [TextMessage[ID:<322457.1582057700831.0>, WWW]] to integration Message payload [WWW]
15:28:20.845 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.router.HeaderValueRouter - headerValueRouter received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.848 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11 received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.849 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.850 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.864 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@24067d07
15:28:20.865 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/NON_EXISTING_QUEUE]
15:28:20.903 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI
javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
    at weblogic.rjvm.ResponseImpl.unmarshalReturn(ResponseImpl.java:234)
    at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:348)
    at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:259)
    at weblogic.jndi.internal.ServerNamingNode_1034_WLStub.lookup(Unknown Source)
    at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:423)
    at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:411)
    at javax.naming.InitialContext.lookup(InitialContext.java:417)
    at org.springframework.jndi.JndiTemplate.lambda$lookup[=12=](JndiTemplate.java:157)
    at org.springframework.jndi.JndiTemplate.execute(JndiTemplate.java:92)
    at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:157)
    at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:179)
    at org.springframework.jndi.JndiLocatorSupport.lookup(JndiLocatorSupport.java:96)
    at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:114)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:115)
    at org.springframework.jms.core.JmsTemplate.lambda$send(JmsTemplate.java:585)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:691)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:671)
    at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:219)
    at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=12=]0(BroadcastingDispatcher.java:56)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
    at weblogic.jndi.internal.BasicNamingNode.newNameNotFoundException(BasicNamingNode.java:1180)
    at weblogic.jndi.internal.BasicNamingNode.lookupHere(BasicNamingNode.java:270)
    ...
    at weblogic.security.acl.internal.AuthenticatedSubject.doAs(AuthenticatedSubject.java:363)
    at weblogic.security.service.SecurityManager.runAs(SecurityManager.java:146)
    at weblogic.rmi.internal.BasicServerRef.handleRequest(BasicServerRef.java:523)
    at weblogic.rmi.internal.wls.WLSExecuteRequest.run(WLSExecuteRequest.java:118)
    at weblogic.work.ExecuteThread.execute(ExecuteThread.java:311)
    at weblogic.work.ExecuteThread.run(ExecuteThread.java:263)
15:28:20.909 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.915 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
15:28:20.916 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.917 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 5000
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
15:28:25.919 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:25.919 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 10000
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
15:28:35.920 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
15:28:35.921 [executor-1] DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer - Sending ErrorMessage: failedMessage: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined.
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:198)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.cloneAndExecute(AbstractRequestHandlerAdvice.java:93)
    at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.lambda$doInvoke(RequestHandlerRetryAdvice.java:82)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.doInvoke(RequestHandlerRetryAdvice.java:82)
    at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy93.handleMessage(Unknown Source)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:95)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
15:28:35.921 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.921 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@38b61501 received message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.923 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@e0750f9
15:28:35.924 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/JMSExceptionQueue]
15:28:35.926 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Located object with JNDI name [jdbc/JMSExceptionQueue]
15:28:35.944 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Sending created message: ObjectMessage[null,org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]

就像您在日志记录处理程序中所做的那样:

@Bean
public IntegrationFlow listenerErrorHandlingFlow() {
    return IntegrationFlows.from(listenerErrorChannel())
            .handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
                    ((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
                    ((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
}

您需要向错误流中添加一个转换器,以通过从异常中提取 failedMessage 来转换消息。

.transform("payload.failedMessage")
.handle(Jms...)
.get();