路由单 Spring 集成 (4.2)
Routing Slip Spring Integration (4.2)
我正在尝试使用 Spring 集成来实施路由单(EI 模式)。我做的配置是
@Bean
@Transformer(inputChannel = "routingServiceChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("routingChannel2",
"routingChannel1")));
}
所以当它到达routingServiceChannel时,它会转到routingChannel2,然后是routingChannel1。但在我的例子中,它总是抛出一个异常,即它没有回复通道。当我设置输出通道时,比如
@Transformer(inputChannel = "routingServiceChannel", outputChannel=""xyzChannel)
然后路由发生在 xyzChannel 而不是去 routingChannel2 和 routingChannel1。
当我调试 spring 集成核心中的代码时,我偶然发现了这段代码,
在
class AbstractReplyProducingMessageHandler
protected final void handleMessageInternal(Message<?> message) {
Object result;
if (this.advisedRequestHandler == null) {
result = handleRequestMessage(message);
} else {
result = doInvokeAdvisedRequestHandler(message);
}
if (result != null) {
sendOutputs(result, message);
} else if (this.requiresReply) {
throw new ReplyRequiredException(message, "No reply produced by handler '" + getComponentName()
+ "', and its 'requiresReply' property is set to true.");
} else if (logger.isDebugEnabled()) {
logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
}
}
在 handle message 方法中,他们获取 routingSlip 映射并将其分配给结果。
并且
protected void produceOutput(Object reply, Message<?> requestMessage) {
MessageHeaders requestHeaders = requestMessage.getHeaders();
Object replyChannel = null;
if (getOutputChannel() == null) {
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
if (routingSlipHeader != null) {
Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
Object key = routingSlipHeader.keySet().iterator().next();
Object value = routingSlipHeader.values().iterator().next();
Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
List<?> routingSlip = (List<?>) key;
AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
if (replyChannel != null) {
// TODO Migrate to the SF MessageBuilder
AbstractIntegrationMessageBuilder<?> builder = null;
if (reply instanceof Message) {
builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
} else if (reply instanceof AbstractIntegrationMessageBuilder) {
builder = (AbstractIntegrationMessageBuilder<?>) reply;
} else {
builder = this.getMessageBuilderFactory().withPayload(reply);
}
builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
Collections.singletonMap(routingSlip, routingSlipIndex.get()));
reply = builder;
}
}
if (replyChannel == null) {
replyChannel = requestHeaders.getReplyChannel();
}
}
Message<?> replyMessage = createOutputMessage(reply, requestHeaders);
sendOutput(replyMessage, replyChannel);
}
他们不是从回复中获取 routingSlip 配置,而是试图从 requestMessage 中获取。
我在这里错过了什么吗?我需要设置任何其他配置吗?
感谢您对主题的关注,顺便说一句! :)
一切看起来都不错,但您错过了路由单的要点。
首先你应该为消息配置它。由于路由单是 header
,您应该使用 HeaderEnricher
将其添加到消息的 header 中。
路由确实是在 requestMessage
的下游造成的,而不是 reply
。路由单已超出 HeaderEnricher
。
当然 HeaderEnricher
的 requestMessage
可能会发生这种情况。
如果您想在 HeaderEnricher
之后查看路由单,您应该配置如下内容:
@BridgeTo
@Bean
public MessageChannel xyzChannel() {
return new DirectChannel();
}
注意:新header在HeaderEnricher
逻辑期间不可用。只在下游。 Routing Slip就是其中之一。
我正在尝试使用 Spring 集成来实施路由单(EI 模式)。我做的配置是
@Bean
@Transformer(inputChannel = "routingServiceChannel")
public HeaderEnricher headerEnricher() {
return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
new RoutingSlipHeaderValueMessageProcessor("routingChannel2",
"routingChannel1")));
}
所以当它到达routingServiceChannel时,它会转到routingChannel2,然后是routingChannel1。但在我的例子中,它总是抛出一个异常,即它没有回复通道。当我设置输出通道时,比如
@Transformer(inputChannel = "routingServiceChannel", outputChannel=""xyzChannel)
然后路由发生在 xyzChannel 而不是去 routingChannel2 和 routingChannel1。
当我调试 spring 集成核心中的代码时,我偶然发现了这段代码, 在
class AbstractReplyProducingMessageHandler
protected final void handleMessageInternal(Message<?> message) {
Object result;
if (this.advisedRequestHandler == null) {
result = handleRequestMessage(message);
} else {
result = doInvokeAdvisedRequestHandler(message);
}
if (result != null) {
sendOutputs(result, message);
} else if (this.requiresReply) {
throw new ReplyRequiredException(message, "No reply produced by handler '" + getComponentName()
+ "', and its 'requiresReply' property is set to true.");
} else if (logger.isDebugEnabled()) {
logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
}
}
在 handle message 方法中,他们获取 routingSlip 映射并将其分配给结果。 并且
protected void produceOutput(Object reply, Message<?> requestMessage) {
MessageHeaders requestHeaders = requestMessage.getHeaders();
Object replyChannel = null;
if (getOutputChannel() == null) {
Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
if (routingSlipHeader != null) {
Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
Object key = routingSlipHeader.keySet().iterator().next();
Object value = routingSlipHeader.values().iterator().next();
Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
List<?> routingSlip = (List<?>) key;
AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
if (replyChannel != null) {
// TODO Migrate to the SF MessageBuilder
AbstractIntegrationMessageBuilder<?> builder = null;
if (reply instanceof Message) {
builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
} else if (reply instanceof AbstractIntegrationMessageBuilder) {
builder = (AbstractIntegrationMessageBuilder<?>) reply;
} else {
builder = this.getMessageBuilderFactory().withPayload(reply);
}
builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
Collections.singletonMap(routingSlip, routingSlipIndex.get()));
reply = builder;
}
}
if (replyChannel == null) {
replyChannel = requestHeaders.getReplyChannel();
}
}
Message<?> replyMessage = createOutputMessage(reply, requestHeaders);
sendOutput(replyMessage, replyChannel);
}
他们不是从回复中获取 routingSlip 配置,而是试图从 requestMessage 中获取。 我在这里错过了什么吗?我需要设置任何其他配置吗?
感谢您对主题的关注,顺便说一句! :)
一切看起来都不错,但您错过了路由单的要点。
首先你应该为消息配置它。由于路由单是 header
,您应该使用 HeaderEnricher
将其添加到消息的 header 中。
路由确实是在 requestMessage
的下游造成的,而不是 reply
。路由单已超出 HeaderEnricher
。
当然 HeaderEnricher
的 requestMessage
可能会发生这种情况。
如果您想在 HeaderEnricher
之后查看路由单,您应该配置如下内容:
@BridgeTo
@Bean
public MessageChannel xyzChannel() {
return new DirectChannel();
}
注意:新header在HeaderEnricher
逻辑期间不可用。只在下游。 Routing Slip就是其中之一。