Spring Integration 5.0 Reactor 类型支持

Spring Integration 5.0 Reactor types support

来自发行说明 (https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):

  • Reactive Streams support via FluxMessageChannel, ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber implementation in the AbstractMessageHandler;

我对 Reactor 支持的理解是,例如您可以从 transformer/handler return Mono/Flux,并且 Spring Integration 会自动将其转换为 Messages,同时尊重背压。不幸的是,我不能让它那样工作,例如:

IntegrationFlows.from("input")
                .handle((p, h) -> Flux.just(1, 2, 3))
                .log("l1")
                .channel("output")
                .get();

仍然记录一条具有 FluxArray 类型负载的消息,而不是三条具有整数负载的消息。

2017-12-18 17:12:33.262  INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]

此外,根据此行为和新内容,文档中没有任何内容

FluxMessageChannel, ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber implementation in the AbstractMessageHandler

所以我的问题是,我是否正确理解了已实施的 Reactor 支持,我在哪里可以找到关于该主题的任何信息?

因为我们在这里进行消息传递,所以对于消息而言,您 return 从您的服务中获得什么样的有效负载并不重要,所有内容都按原样包装到 Message 中。您需要一个特殊的组件来理解此有效负载。其中之一是 Splitter。这确定您的有效负载是反应流 Publisher 并将其迭代为 Flux.

另一个组件是 WebFluxInboundEndpoint,它本身支持这种有效载荷。

您的自定义服务激活器可能希望 Flux 作为要处理的参数。

但没有任何事情会自动发生。 Spring 集成支持 Reactive 类型,但不会在没有最终用户偏好的情况下进行处理。

顺便说一句,splitter 应该与 FluxMessageChannel 一起作为输出提供,以通过背压方式处理拆分的 Flux

随时提出有关记录 FluxMessageChannel 的 A JIRA。我们确实错过了这一点。 ReactiveStreamsConsumer 也需要更多的关爱,我们为 5.1 制定了一些计划来改进 Reactive Streams 模型,我们将尝试使其更加灵活,甚至喜欢默认打开它的选项。不过从今天开始什么都不能保证了。