将重新排序器添加到输出通道会删除 NoSuchBeanDefinitionException 并更改 ChannelMapper 行为
Adding resequencer to output channel removes NoSuchBeanDefinitionException and changes ChannelMapper behaviour
我正在使用多个 IntegrationFlows 来处理传入的消息。在第一个通道上,我 select 用于处理的初始通道:
@Bean
public IntegrationFlow mappingFlow() {
return IntegrationFlows.from(inputChannel())
//route payload to channel
.handle(Message.class, (payload, header) -> (byte[])payload.getPayload())
.<byte[], Integer> route(p -> Parser.getType(p)),
mapping -> mapping
.channelMapping(1, "1Channel")
.channelMapping(2, "2Channel")
.channelMapping(3, "3Channel")
.channelMapping(4, "4Channel")
)
.get();
}
中间还有其他几个 Integrationflow,它们处理消息,然后将传入有效负载中的序列号复制到消息 headers:
@Bean
public IntegrationFlow process1Channel() {
return IntegrationFlows.from("1Channel")
.enrichHeaders(h ->
h.headerFunction(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, message -> Parser.getSequenceNumber(byte[])(message.getPayload())
.handle(new GenericHandler<byte[]>() {
@Override
public Object handle(byte[] payload, MessageHeaders headers) {
...
return output;
}
})
.channel("output")
.get();
}
最后,有一个输出通道,它将创建的回复消息发送回调用者,这样工作得很好:
@Bean
public IntegrationFlow processOutput() {
return IntegrationFlows.from("output")
.handle(new GenericHandler<byte[]>() {
@Override
public Object handle(byte[] payload, MessageHeaders headers) {
...
return payload;
}
})
.get();
}
但是,由于我需要确保重新排序是基于前面集成流程中设置的 SequenceNumber,所以我尝试添加以下重新排序(可能有更好的方法来指定 correlationExpression,但不指定一个结果 'java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?'):
.resequence(r -> r.releasePartialSequences(true).order(Ordered.LOWEST_PRECEDENCE).correlationExpression("headers[ip_connectionId]"))
现在的问题是,它可以工作,但如果发送 unknown/undefined 输入消息类型(例如消息类型 8),则不再有异常:
原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的名为“8”的 bean
推杆:
// .resolutionRequired(true)
对初始流的通道映射也没有影响,只要将重序列添加到输出通道,它似乎就会被忽略(输出通道是唯一的地方,我把重序列放在那里) .此外,如果将重新序列添加到输出,输入上的映射似乎根本不处理传入的未知消息,整个处理就卡住了。
我是不是通过在上面添加重新排序方法做错了什么,我是否缺少默认值,我需要在指定重新排序时明确设置或者这是一个错误(在 outputChannel 上添加重新排序会更改 inputChannel 映射行为)?
谢谢。
仅当您的邮件具有 IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER
header 时,重新排序器才能正常工作。然后,当您的发布策略 returns true
时,组中的消息将被排序并仅发出那些连续且中间没有间隙的消息。
您的路由器映射问题可能与重新排序器问题完全无关:无论如何,resolutionRequired
默认为 true
...
我正在使用多个 IntegrationFlows 来处理传入的消息。在第一个通道上,我 select 用于处理的初始通道:
@Bean
public IntegrationFlow mappingFlow() {
return IntegrationFlows.from(inputChannel())
//route payload to channel
.handle(Message.class, (payload, header) -> (byte[])payload.getPayload())
.<byte[], Integer> route(p -> Parser.getType(p)),
mapping -> mapping
.channelMapping(1, "1Channel")
.channelMapping(2, "2Channel")
.channelMapping(3, "3Channel")
.channelMapping(4, "4Channel")
)
.get();
}
中间还有其他几个 Integrationflow,它们处理消息,然后将传入有效负载中的序列号复制到消息 headers:
@Bean
public IntegrationFlow process1Channel() {
return IntegrationFlows.from("1Channel")
.enrichHeaders(h ->
h.headerFunction(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, message -> Parser.getSequenceNumber(byte[])(message.getPayload())
.handle(new GenericHandler<byte[]>() {
@Override
public Object handle(byte[] payload, MessageHeaders headers) {
...
return output;
}
})
.channel("output")
.get();
}
最后,有一个输出通道,它将创建的回复消息发送回调用者,这样工作得很好:
@Bean
public IntegrationFlow processOutput() {
return IntegrationFlows.from("output")
.handle(new GenericHandler<byte[]>() {
@Override
public Object handle(byte[] payload, MessageHeaders headers) {
...
return payload;
}
})
.get();
}
但是,由于我需要确保重新排序是基于前面集成流程中设置的 SequenceNumber,所以我尝试添加以下重新排序(可能有更好的方法来指定 correlationExpression,但不指定一个结果 'java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing?'):
.resequence(r -> r.releasePartialSequences(true).order(Ordered.LOWEST_PRECEDENCE).correlationExpression("headers[ip_connectionId]"))
现在的问题是,它可以工作,但如果发送 unknown/undefined 输入消息类型(例如消息类型 8),则不再有异常: 原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的名为“8”的 bean
推杆:
// .resolutionRequired(true)
对初始流的通道映射也没有影响,只要将重序列添加到输出通道,它似乎就会被忽略(输出通道是唯一的地方,我把重序列放在那里) .此外,如果将重新序列添加到输出,输入上的映射似乎根本不处理传入的未知消息,整个处理就卡住了。
我是不是通过在上面添加重新排序方法做错了什么,我是否缺少默认值,我需要在指定重新排序时明确设置或者这是一个错误(在 outputChannel 上添加重新排序会更改 inputChannel 映射行为)?
谢谢。
仅当您的邮件具有 IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER
header 时,重新排序器才能正常工作。然后,当您的发布策略 returns true
时,组中的消息将被排序并仅发出那些连续且中间没有间隙的消息。
您的路由器映射问题可能与重新排序器问题完全无关:无论如何,resolutionRequired
默认为 true
...