路由单 Spring 集成 (4.2)

Routing Slip Spring Integration (4.2)

我正在尝试使用 Spring 集成来实施路由单(EI 模式)。我做的配置是

 @Bean
    @Transformer(inputChannel = "routingServiceChannel")
    public HeaderEnricher headerEnricher() {
        return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
                new RoutingSlipHeaderValueMessageProcessor("routingChannel2",                   
                    "routingChannel1")));
    }

所以当它到达routingServiceChannel时,它会转到routingChannel2,然后是routingChannel1。但在我的例子中,它总是抛出一个异常,即它没有回复通道。当我设置输出通道时,比如

@Transformer(inputChannel = "routingServiceChannel", outputChannel=""xyzChannel)

然后路由发生在 xyzChannel 而不是去 routingChannel2 和 routingChannel1。

当我调试 spring 集成核心中的代码时,我偶然发现了这段代码, 在

class AbstractReplyProducingMessageHandler

protected final void handleMessageInternal(Message<?> message) {
        Object result;
        if (this.advisedRequestHandler == null) {
            result = handleRequestMessage(message);
        } else {
            result = doInvokeAdvisedRequestHandler(message);
        }
        if (result != null) {
            sendOutputs(result, message);
        } else if (this.requiresReply) {
            throw new ReplyRequiredException(message, "No reply produced by handler '" + getComponentName()
                    + "', and its 'requiresReply' property is set to true.");
        } else if (logger.isDebugEnabled()) {
            logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
        }
    }

在 handle message 方法中,他们获取 routingSlip 映射并将其分配给结果。 并且

protected void produceOutput(Object reply, Message<?> requestMessage) {
        MessageHeaders requestHeaders = requestMessage.getHeaders();

        Object replyChannel = null;
        if (getOutputChannel() == null) {
            Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
            if (routingSlipHeader != null) {
                Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
                Object key = routingSlipHeader.keySet().iterator().next();
                Object value = routingSlipHeader.values().iterator().next();
                Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
                Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
                List<?> routingSlip = (List<?>) key;
                AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
                replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
                if (replyChannel != null) {
                    // TODO Migrate to the SF MessageBuilder
                    AbstractIntegrationMessageBuilder<?> builder = null;
                    if (reply instanceof Message) {
                        builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
                    } else if (reply instanceof AbstractIntegrationMessageBuilder) {
                        builder = (AbstractIntegrationMessageBuilder<?>) reply;
                    } else {
                        builder = this.getMessageBuilderFactory().withPayload(reply);
                    }
                    builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
                            Collections.singletonMap(routingSlip, routingSlipIndex.get()));
                    reply = builder;
                }
            }

            if (replyChannel == null) {
                replyChannel = requestHeaders.getReplyChannel();
            }
        }

        Message<?> replyMessage = createOutputMessage(reply, requestHeaders);
        sendOutput(replyMessage, replyChannel);
    }

他们不是从回复中获取 routingSlip 配置,而是试图从 requestMessage 中获取。 我在这里错过了什么吗?我需要设置任何其他配置吗?

感谢您对主题的关注,顺便说一句! :)

一切看起来都不错,但您错过了路由单的要点。

首先你应该为消息配置它。由于路由单是 header,您应该使用 HeaderEnricher 将其添加到消息的 header 中。

路由确实是在 requestMessage 的下游造成的,而不是 reply。路由单已超出 HeaderEnricher

当然 HeaderEnricherrequestMessage 可能会发生这种情况。

如果您想在 HeaderEnricher 之后查看路由单,您应该配置如下内容:

@BridgeTo
@Bean
public MessageChannel xyzChannel() {
    return new DirectChannel();
}

注意:header在HeaderEnricher逻辑期间不可用。只在下游。 Routing Slip就是其中之一。