Java CompletableFuture.complete() 块
Java CompletableFuture.complete() block
我在 java 中使用 CompletableFuture 时遇到问题。
我有 2 个 select 请求,这些请求在收到来自服务器的响应时被填充。
在连接线程(THREAD-1)(使用reactor)中,我使用:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
在其他线程 (THREAD-2) 中,我使用:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
我的情况是系统打印出"Receive all future" 但是THREAD-1 在调用future.complete(result);
时被阻塞无法退出该命令。
如果在 THREAD-2 中,我使用 CompletableFuture.allOf(allOfSelect).get()
,THREAD-1 将正确地 运行。但是使用 CompletableFuture.get()
会降低性能,所以我想使用 CompletableFuture.whenComplete()
.
谁能帮我解释一下阻塞的原因?
谢谢!
complete
调用会触发所有相关的 CompletionStage
。
因此,如果您之前使用 whenComplete
注册了 BiConsumer
,complete
将在其调用线程中调用它。在您的情况下,当您传递给 whenComplete
的 BiConsumer
完成时,对 complete
的调用将 return。这在class javadoc
中有描述
Actions supplied for dependent completions of non-async methods may be
performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.
(by another caller 是相反的情况,线程调用 whenComplete
实际上会应用 BiConsumer
if 目标 CompletableFuture
已经完成。)
这里有一个小程序来说明该行为:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
这将打印
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
显示 BiConsumer
已在主线程中应用,即调用 complete
的线程。
您可以使用 whenCompleteAsync
强制在单独的线程中执行 BiConsumer
。
[...] that executes the given action using this stage's default
asynchronous execution facility when this stage completes.
例如,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
将打印
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
显示 BiConsumer
已在单独的线程中应用。
我在 java 中使用 CompletableFuture 时遇到问题。 我有 2 个 select 请求,这些请求在收到来自服务器的响应时被填充。
在连接线程(THREAD-1)(使用reactor)中,我使用:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
在其他线程 (THREAD-2) 中,我使用:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
我的情况是系统打印出"Receive all future" 但是THREAD-1 在调用future.complete(result);
时被阻塞无法退出该命令。
如果在 THREAD-2 中,我使用 CompletableFuture.allOf(allOfSelect).get()
,THREAD-1 将正确地 运行。但是使用 CompletableFuture.get()
会降低性能,所以我想使用 CompletableFuture.whenComplete()
.
谁能帮我解释一下阻塞的原因?
谢谢!
complete
调用会触发所有相关的 CompletionStage
。
因此,如果您之前使用 whenComplete
注册了 BiConsumer
,complete
将在其调用线程中调用它。在您的情况下,当您传递给 whenComplete
的 BiConsumer
完成时,对 complete
的调用将 return。这在class javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.
(by another caller 是相反的情况,线程调用 whenComplete
实际上会应用 BiConsumer
if 目标 CompletableFuture
已经完成。)
这里有一个小程序来说明该行为:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
这将打印
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
显示 BiConsumer
已在主线程中应用,即调用 complete
的线程。
您可以使用 whenCompleteAsync
强制在单独的线程中执行 BiConsumer
。
[...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.
例如,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
将打印
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
显示 BiConsumer
已在单独的线程中应用。