从 SI 中的反应式单向处理程序订阅 Mono<Void>

Subscribing to Mono<Void> from reactive one-way handlers in SI

我从事 Reactor 项目已有一段时间了,但我对 SI 还很陌生,尤其是在 SI 中使用反应式组件时。 这可能是非常基本的,但我仍然无法理解,而且我阅读 Spring 文档越多,我就越不理解。

我有这个流程,我正在使用 returns Mono 的反应式适配器。 据我了解,一旦您使用 handler/gateway returns voidMono<Void>(单向 'MessageHandler'),则表示结束流程,您不能在下游添加任何其他内容。 所以这给我留下了这个:

  @Bean
  public IntegrationFlow flow(AdapterThatReturnsMonoVoid adapter) {
    return f -> f
     // transformations, routing and other stuff..
     .handle(adapter::handleMessage)      
  }

现在。将如何完成对该 Mono 的订阅? 如果我想订阅那个 Mono 的频道怎么办? 即使 Mono 没有发出任何项目,我也希望能够订阅完成信号。

例如,如果我使用 MongoDb.reactiveOutboundChannelAdapter(...),当我 运行 流时,我可以看到负载实际上存储在 mongo 中。所以我假设订阅是自动完成的?

我想我的目的是获得某种手术完成的收据。

感谢您的帮助!

MongoDb.reactiveOutboundChannelAdapter(...) 由包装到 ReactiveMessageHandlerAdapterReactiveMongoDbStoringMessageHandler 表示,以真正自动进行订阅。

如果服务激活器被标记为 async,实际上任何 Publisher 回复都会发生同样的情况。所以,如果你的 adapter::handleMessage returns Mono<Void>,你可以这样做,你会完成 Mono:

    @Bean
    public IntegrationFlow reactiveStore() {
        return f -> f
                .<Object>handle((p, h) -> Mono.empty(),
                        e -> e.async(true)
                                .customizeMonoReply((message, mono) ->
                                        mono.doOnSuccess(data -> System.out.println("Completed: " + data))));
    }

我在日志中得到了这个:

Completed: null

我可能需要修改一个ReactiveMessageHandlerAdapter,因为看起来任何ReactiveMessageHandler都可以直接处理,并且更顺利。请提出 GH 问题,这样我们就不会失去讨论。

同时,这里有一些文档可以让您更加清楚:

https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-message-handler

https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#reactive-advice