Mono.toFuture() 阻塞了吗?

Is Mono.toFuture() blocking?

来自Official Documentation of Mono#block()据说:

Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

所以可以肯定 block() 方法是阻塞的,它不会执行下一行,直到 block() 解决。

但我的困惑是,当我使用 toFuture() 时期望它是非阻塞的,但它的行为与 block 方法完全一样。在 Documentation of Mono#toFuture() 中指出:

Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.

不太清楚。该文档中没有任何地方说 Mono#toFuture() 正在阻塞

  1. 请确认 toFuture() 方法是阻塞还是非阻塞?
  2. 另外如果是非阻塞那么,哪个线程会负责执行里面的代码CompletableFuture?

更新:添加了代码片段

使用Mono.block()方法:

    long time = System.currentTimeMillis();
    String block = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).block();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));

    return CompletableFuture.completedFuture(block);

使用Mono.ToFuture()方法:

    long time = System.currentTimeMillis();
    CompletableFuture<String> toFuture = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).toFuture();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));
    return toFuture;

这两个代码片段的行为完全相同。

是的,你的怀疑是完全正确的。实际上,Mono.block()Mono.toFuture() 立即订阅并将您带出反应式系统。

官方Springblog post给大家说的更清楚

我还建议查看 Monosource code,其中显示 blocktoFuture 立即订阅。

对@ruhul 第一条评论的解释

你把 .toFuture() 放在你的序列后面的那一行会造成代码阻塞。

所以,在下面的代码中

Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";

    }).toFuture(); // this line is the blocking code.

一旦你遇到 toFuture(),序列的订阅就开始了,你的代码就从反应上下文中出来了。