Spring Integration 5.0 Reactor 类型支持
Spring Integration 5.0 Reactor types support
来自发行说明 (https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):
- Reactive Streams support via FluxMessageChannel,
ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber
implementation in the AbstractMessageHandler;
我对 Reactor 支持的理解是,例如您可以从 transformer/handler return Mono/Flux,并且 Spring Integration 会自动将其转换为 Messages,同时尊重背压。不幸的是,我不能让它那样工作,例如:
IntegrationFlows.from("input")
.handle((p, h) -> Flux.just(1, 2, 3))
.log("l1")
.channel("output")
.get();
仍然记录一条具有 FluxArray 类型负载的消息,而不是三条具有整数负载的消息。
2017-12-18 17:12:33.262 INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]
此外,根据此行为和新内容,文档中没有任何内容
FluxMessageChannel,
ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber
implementation in the AbstractMessageHandler
所以我的问题是,我是否正确理解了已实施的 Reactor 支持,我在哪里可以找到关于该主题的任何信息?
因为我们在这里进行消息传递,所以对于消息而言,您 return 从您的服务中获得什么样的有效负载并不重要,所有内容都按原样包装到 Message
中。您需要一个特殊的组件来理解此有效负载。其中之一是 Splitter
。这确定您的有效负载是反应流 Publisher
并将其迭代为 Flux
.
另一个组件是 WebFluxInboundEndpoint
,它本身支持这种有效载荷。
您的自定义服务激活器可能希望 Flux
作为要处理的参数。
但没有任何事情会自动发生。 Spring 集成支持 Reactive 类型,但不会在没有最终用户偏好的情况下进行处理。
顺便说一句,splitter
应该与 FluxMessageChannel
一起作为输出提供,以通过背压方式处理拆分的 Flux
。
随时提出有关记录 FluxMessageChannel
的 A JIRA。我们确实错过了这一点。 ReactiveStreamsConsumer
也需要更多的关爱,我们为 5.1
制定了一些计划来改进 Reactive Streams 模型,我们将尝试使其更加灵活,甚至喜欢默认打开它的选项。不过从今天开始什么都不能保证了。
来自发行说明 (https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):
- Reactive Streams support via FluxMessageChannel, ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber implementation in the AbstractMessageHandler;
我对 Reactor 支持的理解是,例如您可以从 transformer/handler return Mono/Flux,并且 Spring Integration 会自动将其转换为 Messages,同时尊重背压。不幸的是,我不能让它那样工作,例如:
IntegrationFlows.from("input")
.handle((p, h) -> Flux.just(1, 2, 3))
.log("l1")
.channel("output")
.get();
仍然记录一条具有 FluxArray 类型负载的消息,而不是三条具有整数负载的消息。
2017-12-18 17:12:33.262 INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]
此外,根据此行为和新内容,文档中没有任何内容
FluxMessageChannel, ReactiveStreamsConsumer and direct org.reactivestreams.Subscriber implementation in the AbstractMessageHandler
所以我的问题是,我是否正确理解了已实施的 Reactor 支持,我在哪里可以找到关于该主题的任何信息?
因为我们在这里进行消息传递,所以对于消息而言,您 return 从您的服务中获得什么样的有效负载并不重要,所有内容都按原样包装到 Message
中。您需要一个特殊的组件来理解此有效负载。其中之一是 Splitter
。这确定您的有效负载是反应流 Publisher
并将其迭代为 Flux
.
另一个组件是 WebFluxInboundEndpoint
,它本身支持这种有效载荷。
您的自定义服务激活器可能希望 Flux
作为要处理的参数。
但没有任何事情会自动发生。 Spring 集成支持 Reactive 类型,但不会在没有最终用户偏好的情况下进行处理。
顺便说一句,splitter
应该与 FluxMessageChannel
一起作为输出提供,以通过背压方式处理拆分的 Flux
。
随时提出有关记录 FluxMessageChannel
的 A JIRA。我们确实错过了这一点。 ReactiveStreamsConsumer
也需要更多的关爱,我们为 5.1
制定了一些计划来改进 Reactive Streams 模型,我们将尝试使其更加灵活,甚至喜欢默认打开它的选项。不过从今天开始什么都不能保证了。