IntegrationFlows 在错误通道上具有过滤器

IntegrationFlows having filter on Error Channel

我有下面的代码,它只接受 'shipment' 类型的消息。基本上 MQ 主题接受两种类型的消息 'order' 和 'shipment'.

只要我收到 'shipment' 消息,下面的逻辑就可以正常工作,但如果主题收到无效的 'order' 有效负载,它会通过 errorChannel 抛出错误,因为我将过滤器放在 .errorChannel( ).

我的问题是,我是否可以将 .filter() 放在 .errorChannel() 之前?除了 'shipment' 类型之外,我不关心其他消息类型。

提前致谢。

@Bean
public IntegrationFlow jmsInboundFlow() throws Exception {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(messageListenerContainer)
        .errorChannel(inboundErrorChannel))
        .filter(Message.class, m -> m.getHeaders().get("message_type").equals("shipment"))
        .channel(routerChannel)
        .get();
}

编辑: 如果有效负载为空或无效,则 errorChannel() 可以正常工作,但这里的问题是它也会抛出我不感兴趣的其他消息类型。

我收到的错误消息 'order' 消息类型:

FailedMessage: GenericMessage [payload=<order></order>, headers={...}

org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0';

org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'jmsInboundFlow.filter#0' for component 'jmsInboundFlow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [com/win/managedinventorysubscriber/sicomponent/MsgDrivenChannelAdapter.class]'; from source: 'bean method jmsInboundFlow']; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=<order></order>, headers={JMS_IBM_Character_Set=IBM037, JMS_IBM_MsgType=8, sales_order_guid=421028b0-a09d-41e3-878c-f6ce0163e18d, sales_order_action_code=delete, jms_destination=topic:///WISE/ORDERSHIPMENT, sales_order_company_nbr=00028, JMSXUserID=qmqmuser    , JMS_IBM_Encoding=273, message_type=order, priority=0, jms_timestamp=1643727671960, JMSXAppID=T1WSERVICEQA                , JMS_IBM_PutApplType=26, JMS_IBM_Format=MQSTR   , jms_redelivered=false, JMS_IBM_PutDate=20220201, JMSXDeliveryCount=1, sales_order_nbr=007265, jms_correlationId=ID:414d5120543157534552564943455141a290cb6009df8425, JMS_IBM_PutTime=15011196, id=2ea81c40-ce4e-990c-aba7-961c4b464304, jms_messageId=ID:414d5120543157534552564943455141a290cb6000838d25, timestamp=1643727671913}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:428)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:509)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:345)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:744)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:704)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1165)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1062)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at com.win.managedinventorysubscriber.sicomponent.MsgDrivenChannelAdapter.lambda$jmsInboundFlow[=11=](MsgDrivenChannelAdapter.java:33)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:97)
    at org.springframework.integration.filter.AbstractMessageProcessingSelector.accept(AbstractMessageProcessingSelector.java:65)
    at org.springframework.integration.filter.MessageFilter.doHandleRequestMessage(MessageFilter.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingPostProcessingMessageHandler.handleRequestMessage(AbstractReplyProducingPostProcessingMessageHandler.java:50)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)

如果您只想忽略,filter 本身就足够了。

public class MessageFilter extends AbstractReplyProducingPostProcessingMessageHandler
        implements DiscardingMessageHandler, ManageableLifecycle {

    private final MessageSelector selector;

    private boolean throwExceptionOnRejection;

    private MessageChannel discardChannel;

    private String discardChannelName;

selector 是您决定传入消息的函数。

默认情况下throwExceptionOnRejection为false,所以消息不满足过滤器不会抛出异常。

discardChannel默认为null,所以当消息不满足过滤器时,不会发送任何拒绝。

因此,您配置中的过滤器不是该异常的根本原因。请分享有关您问题中异常的更多信息。

查看其文档以获取更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#filter