将重新排序器添加到输出通道会删除 NoSuchBeanDefinitionException 并更改 ChannelMapper 行为

Adding resequencer to output channel removes NoSuchBeanDefinitionException and changes ChannelMapper behaviour

我正在使用多个 IntegrationFlows 来处理传入的消息。在第一个通道上,我 select 用于处理的初始通道:

@Bean
public IntegrationFlow mappingFlow() {
       return IntegrationFlows.from(inputChannel())
            //route payload to channel
            .handle(Message.class, (payload, header) -> (byte[])payload.getPayload())
            .<byte[], Integer> route(p -> Parser.getType(p)), 
                    mapping -> mapping
                     .channelMapping(1, "1Channel")
                     .channelMapping(2, "2Channel")
                     .channelMapping(3, "3Channel")
                     .channelMapping(4, "4Channel")
                     )
            .get();
}

中间还有其他几个 Integrationflow,它们处理消息,然后将传入有效负载中的序列号复制到消息 headers:

@Bean
public IntegrationFlow process1Channel() {
    return IntegrationFlows.from("1Channel")
            .enrichHeaders(h ->
            h.headerFunction(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, message -> Parser.getSequenceNumber(byte[])(message.getPayload())
            .handle(new GenericHandler<byte[]>() {                          
                @Override
                public Object handle(byte[] payload, MessageHeaders headers) {
                    ...
                    return output;
                }

              })
              .channel("output")
              .get();
}

最后,有一个输出通道,它将创建的回复消息发送回调用者,这样工作得很好:

@Bean
public IntegrationFlow processOutput() {
    return IntegrationFlows.from("output")
            .handle(new GenericHandler<byte[]>() {                          
                @Override
                public Object handle(byte[] payload, MessageHeaders headers) {
                    ...
                    return payload;
                }

              })
              .get();
}

但是,由于我需要确保重新排序是基于前面集成流程中设置的 SequenceNumber,所以我尝试添加以下重新排序(可能有更好的方法来指定 correlationExpression,但不指定一个结果 'java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?'):

.resequence(r -> r.releasePartialSequences(true).order(Ordered.LOWEST_PRECEDENCE).correlationExpression("headers[ip_connectionId]"))

现在的问题是,它可以工作,但如果发送 unknown/undefined 输入消息类型(例如消息类型 8),则不再有异常: 原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的名为“8”的 bean

推杆:

//                       .resolutionRequired(true)

对初始流的通道映射也没有影响,只要将重序列添加到输出通道,它似乎就会被忽略(输出通道是唯一的地方,我把重序列放在那里) .此外,如果将重新序列添加到输出,输入上的映射似乎根本不处理传入的未知消息,整个处理就卡住了。

我是不是通过在上面添加重新排序方法做错了什么,我是否缺少默认值,我需要在指定重新排序时明确设置或者这是一个错误(在 outputChannel 上添加重新排序会更改 inputChannel 映射行为)?

谢谢。

仅当您的邮件具有 IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER header 时,重新排序器才能正常工作。然后,当您的发布策略 returns true 时,组中的消息将被排序并仅发出那些连续且中间没有间隙的消息。

您的路由器映射问题可能与重新排序器问题完全无关:无论如何,resolutionRequired 默认为 true...