Spring 集成:手动渠道处理

Spring Integration: Manual channel handling

我想要的:构建一个

的可配置库

我的想法:

我怀疑这是做我想做的事情的最佳方式。感觉就像在滥用 spring 集成。在这一点上欢迎任何建议。

但是总的来说(至少在我的测试中)这似乎有效。但是当我 运行 我的应用程序时,我收到如下错误: "Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available".

这尤其糟糕,因为在此异常之后,发布者声称不再有订阅者。因此,在实际应用中,不再处理任何消息。

我不确定这条消息是什么意思,但我可以重现它(但不明白为什么):

@Test
public void channelTest() {
    integrationFlowContext
            .registration(
                    IntegrationFlows.from("any-channel").bridge().get()
            )
            .register();

    registryUtil.registerBean(
            "any-publisher",
            Publisher.class,
            () -> IntegrationFlows.from("any-channel").toReactivePublisher()
    );

    Flux
            .from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
            .subscribe(System.out::println);

    MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
    try {
        messageChannel.send(MessageBuilder.withPayload("test").build());
    } catch (Throwable t) {
        log.error("Error: ", t);
    }

}

我当然阅读了部分 spring 集成文档,但不太了解幕后发生的事情。因此,我想猜测可能的错误原因。

编辑:

然而,这有效:

@TestConfiguration
static class Config {

    GenericApplicationContext applicationContext;
    Config(
            GenericApplicationContext applicationContext,
            IntegrationFlowContext integrationFlowContext
    ) {
        this.applicationContext = applicationContext;
        // optional here, but needed for some reason in my library,
        // since I can't find the channel beans like I will do here,
        // if I didn't register them like so:
        //integrationFlowContext
        //    .registration(
        //    IntegrationFlows.from("any-channel").bridge().get())
        //    .register();

        applicationContext.registerBean(
                "any-publisher",
                Publisher.class,
                () -> IntegrationFlows.from("any-channel").toReactivePublisher()
        );

    }

    @PostConstruct
    void connect(){
        Flux
                .from((Publisher<Message<?>>) applicationContext.getBean("any-publisher"))
                .subscribe(System.out::println);
    }

}

@Autowired
ApplicationContext applicationContext;

@Autowired
IntegrationFlowContext integrationFlowContext;

@Test
@SneakyThrows
public void channel2Test() {

    MessageChannel messageChannel = applicationContext.getBean("any-channel", MessageChannel.class);
    try {
        messageChannel.send(MessageBuilder.withPayload("test").build());
    } catch (Throwable t) {
        log.error("Error: ", t);
    }

}

因此,显然我的上述问题与到达的消息有关 "too early" .. 我猜?!

不,您的问题与 round-robin 有关 any-channel bean 名称在 DirectChannel 上调度。

您定义了两个从该频道开始的 IntegrationFlow 实例,然后您声明了它们自己的订阅者,但在运行时它们都订阅了同一个 any-channel 实例。默认情况下,它带有 round-robin 平衡器。因此,一条消息发送给您的 Flux.from() 订阅者,而另一条消息发送给 bridge() 不知道如何处理您的消息,因此它尝试解析 replyChannel header.

因此你的解决方案只有一个 IntegrationFlows.from("any-channel").toReactivePublisher() 是正确的。尽管您可以只进行 FluxMessageChannel 注册并从一侧使用它来发送常规消息,而从另一侧将其用作 Flux.from().

的反应源