从 SI 中的反应式单向处理程序订阅 Mono<Void>
Subscribing to Mono<Void> from reactive one-way handlers in SI
我从事 Reactor 项目已有一段时间了,但我对 SI 还很陌生,尤其是在 SI 中使用反应式组件时。
这可能是非常基本的,但我仍然无法理解,而且我阅读 Spring 文档越多,我就越不理解。
我有这个流程,我正在使用 returns Mono 的反应式适配器。
据我了解,一旦您使用 handler/gateway returns void
或 Mono<Void>
(单向 'MessageHandler'),则表示结束流程,您不能在下游添加任何其他内容。
所以这给我留下了这个:
@Bean
public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
return f -> f
// transformations, routing and other stuff..
.handle(adapter::handleMessage)
}
现在。将如何完成对该 Mono 的订阅?
如果我想订阅那个 Mono 的频道怎么办?
即使 Mono 没有发出任何项目,我也希望能够订阅完成信号。
例如,如果我使用 MongoDb.reactiveOutboundChannelAdapter(...)
,当我 运行 流时,我可以看到负载实际上存储在 mongo 中。所以我假设订阅是自动完成的?
我想我的目的是获得某种手术完成的收据。
感谢您的帮助!
MongoDb.reactiveOutboundChannelAdapter(...)
由包装到 ReactiveMessageHandlerAdapter
的 ReactiveMongoDbStoringMessageHandler
表示,以真正自动进行订阅。
如果服务激活器被标记为 async
,实际上任何 Publisher
回复都会发生同样的情况。所以,如果你的 adapter::handleMessage
returns Mono<Void>
,你可以这样做,你会完成 Mono
:
@Bean
public IntegrationFlow reactiveStore() {
return f -> f
.<Object>handle((p, h) -> Mono.empty(),
e -> e.async(true)
.customizeMonoReply((message, mono) ->
mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
}
我在日志中得到了这个:
Completed: null
我可能需要修改一个ReactiveMessageHandlerAdapter
,因为看起来任何ReactiveMessageHandler
都可以直接处理,并且更顺利。请提出 GH 问题,这样我们就不会失去讨论。
同时,这里有一些文档可以让您更加清楚:
我从事 Reactor 项目已有一段时间了,但我对 SI 还很陌生,尤其是在 SI 中使用反应式组件时。 这可能是非常基本的,但我仍然无法理解,而且我阅读 Spring 文档越多,我就越不理解。
我有这个流程,我正在使用 returns Monovoid
或 Mono<Void>
(单向 'MessageHandler'),则表示结束流程,您不能在下游添加任何其他内容。
所以这给我留下了这个:
@Bean
public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
return f -> f
// transformations, routing and other stuff..
.handle(adapter::handleMessage)
}
现在。将如何完成对该 Mono 的订阅? 如果我想订阅那个 Mono 的频道怎么办? 即使 Mono 没有发出任何项目,我也希望能够订阅完成信号。
例如,如果我使用 MongoDb.reactiveOutboundChannelAdapter(...)
,当我 运行 流时,我可以看到负载实际上存储在 mongo 中。所以我假设订阅是自动完成的?
我想我的目的是获得某种手术完成的收据。
感谢您的帮助!
MongoDb.reactiveOutboundChannelAdapter(...)
由包装到 ReactiveMessageHandlerAdapter
的 ReactiveMongoDbStoringMessageHandler
表示,以真正自动进行订阅。
如果服务激活器被标记为 async
,实际上任何 Publisher
回复都会发生同样的情况。所以,如果你的 adapter::handleMessage
returns Mono<Void>
,你可以这样做,你会完成 Mono
:
@Bean
public IntegrationFlow reactiveStore() {
return f -> f
.<Object>handle((p, h) -> Mono.empty(),
e -> e.async(true)
.customizeMonoReply((message, mono) ->
mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
}
我在日志中得到了这个:
Completed: null
我可能需要修改一个ReactiveMessageHandlerAdapter
,因为看起来任何ReactiveMessageHandler
都可以直接处理,并且更顺利。请提出 GH 问题,这样我们就不会失去讨论。
同时,这里有一些文档可以让您更加清楚: