IntegrationFlows 在错误通道上具有过滤器
IntegrationFlows having filter on Error Channel
我有下面的代码,它只接受 'shipment' 类型的消息。基本上 MQ 主题接受两种类型的消息 'order' 和 'shipment'.
只要我收到 'shipment' 消息,下面的逻辑就可以正常工作,但如果主题收到无效的 'order' 有效负载,它会通过 errorChannel 抛出错误,因为我将过滤器放在 .errorChannel( ).
我的问题是,我是否可以将 .filter() 放在 .errorChannel() 之前?除了 'shipment' 类型之外,我不关心其他消息类型。
提前致谢。
@Bean
public IntegrationFlow jmsInboundFlow() throws Exception {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(messageListenerContainer)
.errorChannel(inboundErrorChannel))
.filter(Message.class, m -> m.getHeaders().get("message_type").equals("shipment"))
.channel(routerChannel)
.get();
}
编辑:
如果有效负载为空或无效,则 errorChannel() 可以正常工作,但这里的问题是它也会抛出我不感兴趣的其他消息类型。
我收到的错误消息 'order' 消息类型:
FailedMessage: GenericMessage [payload=<order></order>, headers={...}
org.springframework.messaging.MessageHandlingException:
error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0';
org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/win/managedinventorysubscriber/sicomponent/MsgDrivenChannelAdapter.class]'; from source: 'bean method jmsInboundFlow']; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=<order></order>, headers={JMS_IBM_Character_Set=IBM037, JMS_IBM_MsgType=8, sales_order_guid=421028b0-a09d-41e3-878c-f6ce0163e18d, sales_order_action_code=delete, jms_destination=topic:///WISE/ORDERSHIPMENT, sales_order_company_nbr=00028, JMSXUserID=qmqmuser , JMS_IBM_Encoding=273, message_type=order, priority=0, jms_timestamp=1643727671960, JMSXAppID=T1WSERVICEQA , JMS_IBM_PutApplType=26, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20220201, JMSXDeliveryCount=1, sales_order_nbr=007265, jms_correlationId=ID:414d5120543157534552564943455141a290cb6009df8425, JMS_IBM_PutTime=15011196, id=2ea81c40-ce4e-990c-aba7-961c4b464304, jms_messageId=ID:414d5120543157534552564943455141a290cb6000838d25, timestamp=1643727671913}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:428)
at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:509)
at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:345)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at com.win.managedinventorysubscriber.sicomponent.MsgDrivenChannelAdapter.lambda$jmsInboundFlow[=11=](MsgDrivenChannelAdapter.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:65)
at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:171)
at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:50)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
如果您只想忽略,filter
本身就足够了。
public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler
implements DiscardingMessageHandler, ManageableLifecycle {
private final MessageSelector selector;
private boolean throwExceptionOnRejection;
private MessageChannel discardChannel;
private String discardChannelName;
selector
是您决定传入消息的函数。
默认情况下throwExceptionOnRejection
为false,所以消息不满足过滤器不会抛出异常。
discardChannel
默认为null
,所以当消息不满足过滤器时,不会发送任何拒绝。
因此,您配置中的过滤器不是该异常的根本原因。请分享有关您问题中异常的更多信息。
查看其文档以获取更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter
我有下面的代码,它只接受 'shipment' 类型的消息。基本上 MQ 主题接受两种类型的消息 'order' 和 'shipment'.
只要我收到 'shipment' 消息,下面的逻辑就可以正常工作,但如果主题收到无效的 'order' 有效负载,它会通过 errorChannel 抛出错误,因为我将过滤器放在 .errorChannel( ).
我的问题是,我是否可以将 .filter() 放在 .errorChannel() 之前?除了 'shipment' 类型之外,我不关心其他消息类型。
提前致谢。
@Bean
public IntegrationFlow jmsInboundFlow() throws Exception {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(messageListenerContainer)
.errorChannel(inboundErrorChannel))
.filter(Message.class, m -> m.getHeaders().get("message_type").equals("shipment"))
.channel(routerChannel)
.get();
}
编辑: 如果有效负载为空或无效,则 errorChannel() 可以正常工作,但这里的问题是它也会抛出我不感兴趣的其他消息类型。
我收到的错误消息 'order' 消息类型:
FailedMessage: GenericMessage [payload=<order></order>, headers={...}
org.springframework.messaging.MessageHandlingException:
error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0';
org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/win/managedinventorysubscriber/sicomponent/MsgDrivenChannelAdapter.class]'; from source: 'bean method jmsInboundFlow']; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=<order></order>, headers={JMS_IBM_Character_Set=IBM037, JMS_IBM_MsgType=8, sales_order_guid=421028b0-a09d-41e3-878c-f6ce0163e18d, sales_order_action_code=delete, jms_destination=topic:///WISE/ORDERSHIPMENT, sales_order_company_nbr=00028, JMSXUserID=qmqmuser , JMS_IBM_Encoding=273, message_type=order, priority=0, jms_timestamp=1643727671960, JMSXAppID=T1WSERVICEQA , JMS_IBM_PutApplType=26, JMS_IBM_Format=MQSTR , jms_redelivered=false, JMS_IBM_PutDate=20220201, JMSXDeliveryCount=1, sales_order_nbr=007265, jms_correlationId=ID:414d5120543157534552564943455141a290cb6009df8425, JMS_IBM_PutTime=15011196, id=2ea81c40-ce4e-990c-aba7-961c4b464304, jms_messageId=ID:414d5120543157534552564943455141a290cb6000838d25, timestamp=1643727671913}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:428)
at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:509)
at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:345)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
at com.win.managedinventorysubscriber.sicomponent.MsgDrivenChannelAdapter.lambda$jmsInboundFlow[=11=](MsgDrivenChannelAdapter.java:33)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:65)
at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:171)
at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:50)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
如果您只想忽略,filter
本身就足够了。
public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler
implements DiscardingMessageHandler, ManageableLifecycle {
private final MessageSelector selector;
private boolean throwExceptionOnRejection;
private MessageChannel discardChannel;
private String discardChannelName;
selector
是您决定传入消息的函数。
默认情况下throwExceptionOnRejection
为false,所以消息不满足过滤器不会抛出异常。
discardChannel
默认为null
,所以当消息不满足过滤器时,不会发送任何拒绝。
因此,您配置中的过滤器不是该异常的根本原因。请分享有关您问题中异常的更多信息。
查看其文档以获取更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter