Java CompletableFuture.complete() 块

Java CompletableFuture.complete() block

我在 java 中使用 CompletableFuture 时遇到问题。 我有 2 个 select 请求,这些请求在收到来自服务器的响应时被填充。

在连接线程(THREAD-1)(使用reactor)中,我使用:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}

在其他线程 (THREAD-2) 中,我使用:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});

我的情况是系统打印出"Receive all future" 但是THREAD-1 在调用future.complete(result); 时被阻塞无法退出该命令。 如果在 THREAD-2 中,我使用 CompletableFuture.allOf(allOfSelect).get(),THREAD-1 将正确地 运行。但是使用 CompletableFuture.get() 会降低性能,所以我想使用 CompletableFuture.whenComplete().

谁能帮我解释一下阻塞的原因?

谢谢!

complete 调用会触发所有相关的 CompletionStage

因此,如果您之前使用 whenComplete 注册了 BiConsumercomplete 将在其调用线程中调用它。在您的情况下,当您传递给 whenCompleteBiConsumer 完成时,对 complete 的调用将 return。这在class javadoc

中有描述

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

(by another caller 是相反的情况,线程调用 whenComplete 实际上会应用 BiConsumer if 目标 CompletableFuture 已经完成。)

这里有一个小程序来说明该行为:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    future.whenComplete((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
}

这将打印

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done

显示 BiConsumer 已在主线程中应用,即调用 complete 的线程。

您可以使用 whenCompleteAsync 强制在单独的线程中执行 BiConsumer

[...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.

例如,

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
    done.get();
}

将打印

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]

显示 BiConsumer 已在单独的线程中应用。