如何将原始消息存储到 JMS 队列而不是 org.springframework.messaging.MessageDeliveryException 形式
How to store original message into JMS Queue instead of org.springframework.messaging.MessageDeliveryException form
我是第一次使用Spring集成
场景:
- 从源 JMS 队列读取消息,然后推入目标 JMS 队列使用的通道。如果目标 Jndi 名称错误,重试机制将启动。
- 重试结束后,消息被推入异常 JMS 队列。
- 所有队列都在我的本地Weblogic 12c。该应用程序是 Spring Boot 2.1.10.RELEASE + Spring Integration 5.1.9.RELEASE
- 当目标JMS Queue 的Jndi 正确时,消息发送成功。这是有效的,我可以看到消息,因为我将提取有效负载设置为 true。
问题:
- 我可以在异常队列中看到该消息,但它的格式为 org.springframework.messaging.MessageDeliveryException,无法查看就像上面的第 4 点一样。是否可以将原始消息存储在异常队列中,而不是 org.springframework.messaging.MessageDeliveryException 形式?
- 如何使用下面的代码实现此目的?
代码:[更新解决方案]
@Configuration
@EnableRetry
@EnableIntegration
@Slf4j
public class JmsRoutingConfig {
@Bean(name = "executor")
ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
...
return threadPoolTaskExecutor;
}
public RequestHandlerRetryAdvice retryAdvice() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
ExponentialBackOffPolicy exponentialBackOff = new ExponentialBackOffPolicy();
exponentialBackOff.setInitialInterval(5000);
exponentialBackOff.setMultiplier(2);
exponentialBackOff.setMaxInterval(30 * 1000);
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(exponentialBackOff);
retryTemplate.setRetryPolicy(retryPolicy);
RecoveryCallback<Object> recoveryCallBack = new ErrorMessageSendingRecoverer(exceptionQueueChannel());
retryAdvice.setRecoveryCallback(recoveryCallBack);
retryAdvice.setRetryTemplate(retryTemplate);
return retryAdvice;
}
@Bean(name = "exceptionQueueChannel")
public PublishSubscribeChannel exceptionQueueChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "errorChannel")
public PublishSubscribeChannel errorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(errorChannel()).transform("payload.failedMessage").routeByException(r -> r.advice(retryAdvice())).get();
}
@Bean
public IntegrationFlow exceptionHandlingFlow(@Qualifier("exceptionJmsTemplate") JmsTemplate exceptionJmsTemplate) {
return IntegrationFlows.from(exceptionQueueChannel()).transform("payload.failedMessage").handle(Jms.outboundAdapter(exceptionJmsTemplate).extractPayload(true).get())
.get();
}
@Bean(name = "multiSubcriptionChannel")
public PublishSubscribeChannel multiSubcriptionChannel(@Qualifier("executor") ThreadPoolTaskExecutor executor) {
return new PublishSubscribeChannel(executor);
}
@Bean(name = "headerValueRouter")
public HeaderValueRouter headerValueRouter(@Qualifier("multiSubcriptionChannel") PublishSubscribeChannel multiSubcriptionChannel) {
multiSubcriptionChannel.subscribe(Jms.outboundAdapter(getTargetJmsTemplate()).extractPayload(true).get());
HeaderValueRouter router = new HeaderValueRouter("jms_correlationId");
router.setChannelMapping("77", multiSubcriptionChannel.getBeanName());
return router;
}
@Bean
public IntegrationFlow inFlow(@Qualifier("headerValueRouter") HeaderValueRouter headerValueRouter) {
return IntegrationFlows.from(jmsInboundChannel()).route(headerValueRouter).get();
}
@Bean(name = "listenerErrorChannel")
public PublishSubscribeChannel listenerErrorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow listenerErrorHandlingFlow() {
return IntegrationFlows.from(listenerErrorChannel())
.handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
}
@Bean(name = "jmsInboundChannel")
public JmsMessageDrivenEndpoint jmsInboundChannel() {
return Jms.messageDrivenChannelAdapter(routingCachingJmsConnectionFactory()).
configureListenerContainer(
defaultMessageListenerContainerJmsListenerContainerSpec ->
defaultMessageListenerContainerJmsListenerContainerSpec
.destinationResolver(routingJmsDestinationResolver()).acceptMessagesWhileStopping(false)
.sessionTransacted(true).sessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE)
.concurrency(routingSourceJmsLowerUpperConcurrentThreadCount).concurrentConsumers(3).maxConcurrentConsumers(3))
.errorChannel(listenerErrorChannel()).shutdownContainerOnStop(false)
.destination(routingSourceJmsQueue).autoStartup(true)
.get();
}
...
}
日志:
15:28:20.839 [DefaultMessageListenerContainer-3] DEBUG o.s.j.l.DefaultMessageListenerContainer - Received message of type [class weblogic.jms.common.TextMessageImpl] from consumer [weblogic.jms.client.WLConsumerImpl@12af28a8] of session [Cached JMS Session: weblogic.jms.client.WLSessionImpl@4e918fd1]
15:28:20.841 [DefaultMessageListenerContainer-3] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener - converted JMS Message [TextMessage[ID:<322457.1582057700831.0>, WWW]] to integration Message payload [WWW]
15:28:20.845 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.router.HeaderValueRouter - headerValueRouter received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.848 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11 received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.849 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.850 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.864 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@24067d07
15:28:20.865 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/NON_EXISTING_QUEUE]
15:28:20.903 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI
javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
at weblogic.rjvm.ResponseImpl.unmarshalReturn(ResponseImpl.java:234)
at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:348)
at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:259)
at weblogic.jndi.internal.ServerNamingNode_1034_WLStub.lookup(Unknown Source)
at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:423)
at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:411)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at org.springframework.jndi.JndiTemplate.lambda$lookup[=12=](JndiTemplate.java:157)
at org.springframework.jndi.JndiTemplate.execute(JndiTemplate.java:92)
at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:157)
at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:179)
at org.springframework.jndi.JndiLocatorSupport.lookup(JndiLocatorSupport.java:96)
at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:114)
at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:115)
at org.springframework.jms.core.JmsTemplate.lambda$send(JmsTemplate.java:585)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:691)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:671)
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:219)
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:184)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=12=]0(BroadcastingDispatcher.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
at weblogic.jndi.internal.BasicNamingNode.newNameNotFoundException(BasicNamingNode.java:1180)
at weblogic.jndi.internal.BasicNamingNode.lookupHere(BasicNamingNode.java:270)
...
at weblogic.security.acl.internal.AuthenticatedSubject.doAs(AuthenticatedSubject.java:363)
at weblogic.security.service.SecurityManager.runAs(SecurityManager.java:146)
at weblogic.rmi.internal.BasicServerRef.handleRequest(BasicServerRef.java:523)
at weblogic.rmi.internal.wls.WLSExecuteRequest.run(WLSExecuteRequest.java:118)
at weblogic.work.ExecuteThread.execute(ExecuteThread.java:311)
at weblogic.work.ExecuteThread.run(ExecuteThread.java:263)
15:28:20.909 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.915 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
15:28:20.916 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.917 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 5000
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
15:28:25.919 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:25.919 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 10000
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
15:28:35.920 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
15:28:35.921 [executor-1] DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer - Sending ErrorMessage: failedMessage: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined.
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:198)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.cloneAndExecute(AbstractRequestHandlerAdvice.java:93)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.lambda$doInvoke(RequestHandlerRetryAdvice.java:82)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.doInvoke(RequestHandlerRetryAdvice.java:82)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy93.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:95)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
15:28:35.921 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.921 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@38b61501 received message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.923 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@e0750f9
15:28:35.924 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/JMSExceptionQueue]
15:28:35.926 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Located object with JNDI name [jdbc/JMSExceptionQueue]
15:28:35.944 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Sending created message: ObjectMessage[null,org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
就像您在日志记录处理程序中所做的那样:
@Bean
public IntegrationFlow listenerErrorHandlingFlow() {
return IntegrationFlows.from(listenerErrorChannel())
.handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
}
您需要向错误流中添加一个转换器,以通过从异常中提取 failedMessage
来转换消息。
.transform("payload.failedMessage")
.handle(Jms...)
.get();
我是第一次使用Spring集成
场景:
- 从源 JMS 队列读取消息,然后推入目标 JMS 队列使用的通道。如果目标 Jndi 名称错误,重试机制将启动。
- 重试结束后,消息被推入异常 JMS 队列。
- 所有队列都在我的本地Weblogic 12c。该应用程序是 Spring Boot 2.1.10.RELEASE + Spring Integration 5.1.9.RELEASE
- 当目标JMS Queue 的Jndi 正确时,消息发送成功。这是有效的,我可以看到消息,因为我将提取有效负载设置为 true。
问题:
- 我可以在异常队列中看到该消息,但它的格式为 org.springframework.messaging.MessageDeliveryException,无法查看就像上面的第 4 点一样。是否可以将原始消息存储在异常队列中,而不是 org.springframework.messaging.MessageDeliveryException 形式?
- 如何使用下面的代码实现此目的?
代码:[更新解决方案]
@Configuration
@EnableRetry
@EnableIntegration
@Slf4j
public class JmsRoutingConfig {
@Bean(name = "executor")
ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
...
return threadPoolTaskExecutor;
}
public RequestHandlerRetryAdvice retryAdvice() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
ExponentialBackOffPolicy exponentialBackOff = new ExponentialBackOffPolicy();
exponentialBackOff.setInitialInterval(5000);
exponentialBackOff.setMultiplier(2);
exponentialBackOff.setMaxInterval(30 * 1000);
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(exponentialBackOff);
retryTemplate.setRetryPolicy(retryPolicy);
RecoveryCallback<Object> recoveryCallBack = new ErrorMessageSendingRecoverer(exceptionQueueChannel());
retryAdvice.setRecoveryCallback(recoveryCallBack);
retryAdvice.setRetryTemplate(retryTemplate);
return retryAdvice;
}
@Bean(name = "exceptionQueueChannel")
public PublishSubscribeChannel exceptionQueueChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "errorChannel")
public PublishSubscribeChannel errorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(errorChannel()).transform("payload.failedMessage").routeByException(r -> r.advice(retryAdvice())).get();
}
@Bean
public IntegrationFlow exceptionHandlingFlow(@Qualifier("exceptionJmsTemplate") JmsTemplate exceptionJmsTemplate) {
return IntegrationFlows.from(exceptionQueueChannel()).transform("payload.failedMessage").handle(Jms.outboundAdapter(exceptionJmsTemplate).extractPayload(true).get())
.get();
}
@Bean(name = "multiSubcriptionChannel")
public PublishSubscribeChannel multiSubcriptionChannel(@Qualifier("executor") ThreadPoolTaskExecutor executor) {
return new PublishSubscribeChannel(executor);
}
@Bean(name = "headerValueRouter")
public HeaderValueRouter headerValueRouter(@Qualifier("multiSubcriptionChannel") PublishSubscribeChannel multiSubcriptionChannel) {
multiSubcriptionChannel.subscribe(Jms.outboundAdapter(getTargetJmsTemplate()).extractPayload(true).get());
HeaderValueRouter router = new HeaderValueRouter("jms_correlationId");
router.setChannelMapping("77", multiSubcriptionChannel.getBeanName());
return router;
}
@Bean
public IntegrationFlow inFlow(@Qualifier("headerValueRouter") HeaderValueRouter headerValueRouter) {
return IntegrationFlows.from(jmsInboundChannel()).route(headerValueRouter).get();
}
@Bean(name = "listenerErrorChannel")
public PublishSubscribeChannel listenerErrorChannel() {
return new PublishSubscribeChannel();
}
@Bean
public IntegrationFlow listenerErrorHandlingFlow() {
return IntegrationFlows.from(listenerErrorChannel())
.handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
}
@Bean(name = "jmsInboundChannel")
public JmsMessageDrivenEndpoint jmsInboundChannel() {
return Jms.messageDrivenChannelAdapter(routingCachingJmsConnectionFactory()).
configureListenerContainer(
defaultMessageListenerContainerJmsListenerContainerSpec ->
defaultMessageListenerContainerJmsListenerContainerSpec
.destinationResolver(routingJmsDestinationResolver()).acceptMessagesWhileStopping(false)
.sessionTransacted(true).sessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE)
.concurrency(routingSourceJmsLowerUpperConcurrentThreadCount).concurrentConsumers(3).maxConcurrentConsumers(3))
.errorChannel(listenerErrorChannel()).shutdownContainerOnStop(false)
.destination(routingSourceJmsQueue).autoStartup(true)
.get();
}
...
}
日志:
15:28:20.839 [DefaultMessageListenerContainer-3] DEBUG o.s.j.l.DefaultMessageListenerContainer - Received message of type [class weblogic.jms.common.TextMessageImpl] from consumer [weblogic.jms.client.WLConsumerImpl@12af28a8] of session [Cached JMS Session: weblogic.jms.client.WLSessionImpl@4e918fd1]
15:28:20.841 [DefaultMessageListenerContainer-3] DEBUG o.s.i.j.ChannelPublishingJmsMessageListener - converted JMS Message [TextMessage[ID:<322457.1582057700831.0>, WWW]] to integration Message payload [WWW]
15:28:20.845 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.router.HeaderValueRouter - headerValueRouter received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.846 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.848 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11 received message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.849 [DefaultMessageListenerContainer-3] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'multiSubcriptionChannel', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.850 [DefaultMessageListenerContainer-3] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'inFlow.channel#0', message: GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}]
15:28:20.864 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@24067d07
15:28:20.865 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/NON_EXISTING_QUEUE]
15:28:20.903 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI
javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
at weblogic.rjvm.ResponseImpl.unmarshalReturn(ResponseImpl.java:234)
at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:348)
at weblogic.rmi.cluster.ClusterableRemoteRef.invoke(ClusterableRemoteRef.java:259)
at weblogic.jndi.internal.ServerNamingNode_1034_WLStub.lookup(Unknown Source)
at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:423)
at weblogic.jndi.internal.WLContextImpl.lookup(WLContextImpl.java:411)
at javax.naming.InitialContext.lookup(InitialContext.java:417)
at org.springframework.jndi.JndiTemplate.lambda$lookup[=12=](JndiTemplate.java:157)
at org.springframework.jndi.JndiTemplate.execute(JndiTemplate.java:92)
at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:157)
at org.springframework.jndi.JndiTemplate.lookup(JndiTemplate.java:179)
at org.springframework.jndi.JndiLocatorSupport.lookup(JndiLocatorSupport.java:96)
at org.springframework.jms.support.destination.JndiDestinationResolver.resolveDestinationName(JndiDestinationResolver.java:114)
at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:115)
at org.springframework.jms.core.JmsTemplate.lambda$send(JmsTemplate.java:585)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:504)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:691)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:671)
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:219)
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:184)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.access[=12=]0(BroadcastingDispatcher.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.run(BroadcastingDispatcher.java:204)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc'
at weblogic.jndi.internal.BasicNamingNode.newNameNotFoundException(BasicNamingNode.java:1180)
at weblogic.jndi.internal.BasicNamingNode.lookupHere(BasicNamingNode.java:270)
...
at weblogic.security.acl.internal.AuthenticatedSubject.doAs(AuthenticatedSubject.java:363)
at weblogic.security.service.SecurityManager.runAs(SecurityManager.java:146)
at weblogic.rmi.internal.BasicServerRef.handleRequest(BasicServerRef.java:523)
at weblogic.rmi.internal.wls.WLSExecuteRequest.run(WLSExecuteRequest.java:118)
at weblogic.work.ExecuteThread.execute(ExecuteThread.java:311)
at weblogic.work.ExecuteThread.run(ExecuteThread.java:263)
15:28:20.909 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.915 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=0
15:28:20.916 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:20.917 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 5000
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=1
15:28:25.918 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=1
15:28:25.919 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:25.919 [executor-1] DEBUG o.s.r.b.ExponentialBackOffPolicy - Sleeping for 10000
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=2
15:28:35.919 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry: count=2
15:28:35.920 [executor-1] DEBUG o.s.i.r.ErrorMessageExceptionTypeRouter - errorHandlingFlow.router#0 received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
15:28:35.920 [executor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
15:28:35.921 [executor-1] DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer - Sending ErrorMessage: failedMessage: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined.
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:198)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.cloneAndExecute(AbstractRequestHandlerAdvice.java:93)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.lambda$doInvoke(RequestHandlerRetryAdvice.java:82)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.doInvoke(RequestHandlerRetryAdvice.java:82)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy93.handleMessage(Unknown Source)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:95)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=12=](ErrorHandlingTaskExecutor.java:60)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
15:28:35.921 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.921 [executor-1] DEBUG o.s.i.jms.JmsSendingMessageHandler - org.springframework.integration.jms.JmsSendingMessageHandler@38b61501 received message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.923 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Executing callback on JMS Session: weblogic.jms.client.WLSessionImpl@e0750f9
15:28:35.924 [executor-1] DEBUG o.springframework.jndi.JndiTemplate - Looking up JNDI object with name [jdbc/JMSExceptionQueue]
15:28:35.926 [executor-1] DEBUG o.s.j.s.d.JndiDestinationResolver - Located object with JNDI name [jdbc/JMSExceptionQueue]
15:28:35.944 [executor-1] DEBUG o.s.jms.core.JmsTemplate - Sending created message: ObjectMessage[null,org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'exceptionQueueChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: No channel resolved by router 'errorHandlingFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0' and no 'defaultOutputChannel' defined., failedMessage=ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}], headers={id=7d86b3e2-287f-e867-af54-0bb6744c2853, timestamp=1582057715920}]
15:28:35.955 [executor-1] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'errorChannel', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler@f9d2b11]; nested exception is org.springframework.jms.support.destination.DestinationResolutionException: Destination [jdbc/NON_EXISTING_QUEUE] not found in JNDI; nested exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc' [Root exception is javax.naming.NameNotFoundException: Unable to resolve 'jdbc.NON_EXISTING_QUEUE'. Resolved 'jdbc']; remaining name 'NON_EXISTING_QUEUE', failedMessage=GenericMessage [payload=WWW, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=SystemModule!SourceQueue, jms_correlationId=77, id=ca2e8e8f-9b15-fbb2-4940-8b72c601c105, priority=4, jms_timestamp=1582057700831, jms_messageId=ID:<322457.1582057700831.0>, timestamp=1582057700845}], headers={id=7097a41a-6def-cff7-6b65-6ec42cf0ba26, timestamp=1582057700909}]
就像您在日志记录处理程序中所做的那样:
@Bean
public IntegrationFlow listenerErrorHandlingFlow() {
return IntegrationFlows.from(listenerErrorChannel())
.handle(message -> log.error("LISTENER error (Headers): [{}], (Payload): [{}]",
((MessagingException) message.getPayload()).getFailedMessage().getHeaders(),
((MessagingException) message.getPayload()).getFailedMessage().getPayload())).get();
}
您需要向错误流中添加一个转换器,以通过从异常中提取 failedMessage
来转换消息。
.transform("payload.failedMessage")
.handle(Jms...)
.get();