CompletableFuture 是否保证 运行 取消一个新线程?

Is CompletableFuture guaranteed to run un a new thread?

给定一些任意上下文(例如 Junit 单元测试,不具体只是不一定是“主”线程)。

像这样的代码是否必须引入至少 2 个线程?

public static void main (String[] args)
{
    CompletableFuture<Void> s = new CompletableFuture<>();
    CompletableFuture<Void> f = new CompletableFuture<>();
    
    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        try{    
          System.out.println(Thread.currentThread().getId());
          CompletableFuture<String> update =
          CompletableFuture.supplyAsync(
            () -> {
              String ans = null;
              try {
                System.out.println(Thread.currentThread().getId());
                ans = "Hello";
              } catch (Exception e) {
                ans = e.toString();
              } finally {
                s.complete(null);
                return ans;
              }
            });
          s.get();
          System.out.println(s.isDone());
        } catch (Exception e) {
            System.out.println("Some error");
            return null;
        }
        return null;
    });
    
    System.out.println(f.isDone());
}

当我们在 someContext 中到达 s.get() 时,JVM 能否检测到它正在等待 -> 上下文切换到 update 完成它,然后切换回 someContext

当 运行 它在 ideone 中时,它始终在两个不同的线程中运行它们,但这只是一次观察。

我想了解 language/runtime 提供的保证。

不,此保证不存在。

你所做的是不安全的。

如果您查看 CompletableFuture.supplyAsync(Supplier) 的文档:

Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier.

你可以看到它使用了从ForkJoinPool.commonPool()获得的ForkJoinPool

来自 ForkJoinPool 的文档:

all threads in the pool attempt to find and execute tasks submitted to the pool and/or created by other active tasks (eventually blocking waiting for work if none exist). This enables efficient processing when most tasks spawn other subtasks (as do most ForkJoinTasks), as well as when many small tasks are submitted to the pool from external clients. Especially when setting asyncMode to true in constructors, ForkJoinPools may also be appropriate for use with event-style tasks that are never joined. All worker threads are initialized with Thread.isDaemon() set true. A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool. Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use). For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors

这意味着提交的任务可以在任意数量的线程(默认处理器数量)中执行,并且这些线程会被重新使用。如果所有这些线程都很忙,则执行可能会等待之前的执行完成。

由于公共池也可能被应用程序的其他部分使用,因此提交的任务应该 运行 很快并且不应阻塞,以便其他任务可以快速执行。

虽然 OpenJDK has a special handling for ForkJoinPool in CompletableFuture#get 可以确保在此期间可以执行其他任务,但其他 JDK 可能不提供此功能。

备选方案:异步处理

您可能想要使用 CompletableFuture#thenAcceptAsync(Consumer) 之类的方法,而不是使用 .get() 进行阻止。这 运行 是未来完成后的 Consumer

此外,您可以使用CompletionStage#exceptionally以异步方式处理异常。

public static void main (String[] args) throws java.lang.Exception
{
    CompletableFuture<Void> s = new CompletableFuture();
    CompletableFuture<Void> f = new CompletableFuture();
    
    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        
          System.out.println(Thread.currentThread().getId());
          CompletableFuture<String> update =
          CompletableFuture.supplyAsync(
            () -> {
              String ans = null;
              try {
                System.out.println(Thread.currentThread().getId());
                ans = "Hello";
              } catch (Exception e) {
                ans = e.toString();
              } finally {
                s.complete(null);
                return ans;
              }
            });
          s thenSupplyAsync(result->{
              System.out.println(s.isDone());
          }).exceptionally(e->{
              System.out.println("Some error");
              return null;
          });
        
        return null;
    });
    
    System.out.println(f.isDone());
}

答案有点复杂,因为正确的答案是“视情况而定”。在您当前的配置中,您将始终获得两个线程;但那是 ForkJoinPool.

的神器

当来自 ForkJoinPool 的某个线程阻塞时,内部会产生一个新线程 ,以便并行度保持正确。

如果我们将您的示例更改为:

,则可以证明这一点
public static void main (String[] args) {

    ForkJoinPool pool = new ForkJoinPool(1);

    CompletableFuture<Void> s = new CompletableFuture<>();
    CompletableFuture<Void> f = new CompletableFuture<>();

    CompletableFuture<Void> someContext =  CompletableFuture.supplyAsync(() ->
    {
        try{
            System.out.println("someContextName : " + Thread.currentThread().getName());
            CompletableFuture<String> update =
                    CompletableFuture.supplyAsync(
                            () -> {
                                String ans = null;
                                try {
                                    System.out.println("update name : " + Thread.currentThread().getName());
                                    ans = "Hello";
                                } catch (Exception e) {
                                    ans = e.toString();
                                } finally {
                                    s.complete(null);
                                    return ans;
                                }
                            }, pool);
            s.get();
            System.out.println(s.isDone());
        } catch (Exception e) {
            System.out.println("Some error");
            return null;
        }
        return null;
    }, pool);

    System.out.println(f.isDone());

    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
}

请注意,即使 ForkJoinPool 是使用单个线程创建的,这仍会完成,因为将创建一个新线程。

另一方面,如果您更改:

ExecutorService pool = Executors.newFixedThreadPool(1);

你的代码会阻塞。