编写 Java CompletableFutures 时使用哪个执行器?

Which executor is used when composing Java CompletableFutures?

我在某个存储库 class 上有一个方法 return 是 CompletableFuture。完成这些 futures 的代码使用了一个阻塞的第三方库。我打算有一个单独的有界 Executor,这个存储库 class 将使用它来进行这些阻塞调用。

这是一个例子:

public class PersonRepository {
    private Executor executor = ...
    public CompletableFuture<Void> create(Person person) {...}
    public CompletableFuture<Boolean> delete(Person person) {...}
}

我的应用程序的其余部分将组合这些期货并对结果做一些其他事情。当这些其他功能提供给 thenAcceptthenComposewhenComplete 等时,我不希望它们在存储库的 Executor 上 运行 .

另一个例子:

public CompletableFuture<?> replacePerson(Person person) {
    final PersonRepository repo = getPersonRepository();
    return repo.delete(person)
        .thenAccept(existed -> {
            if (existed) System.out.println("Person deleted"));
            else System.out.println("Person did not exist"));
        })
        .thenCompose(unused -> {
            System.out.println("Creating person");
            return repo.create(person);
        })
        .whenComplete((unused, ex) -> {
            if (ex != null) System.out.println("Creating person");
            repo.close();
        });
}

JavaDoc 指出:

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

附带问题: 为什么这里有 ?在什么情况下有另一个完成方法的调用者没有完成当前的未来?

主要问题:如果我希望所有 println 由不同于存储库使用的 Executor 执行,哪些方法可以我需要异步并手动提供执行程序?

显然 thenAccept 需要更改为 thenAcceptAsync,但我不确定从那一点开始。

替代问题:哪个线程完成了 thenCompose 的 returned 未来?

我的猜测是,它将是从函数参数中完成未来 return 的任何线程。换句话说,我还需要将 whenComplete 更改为 whenCompleteAsync.

也许我把事情复杂化了,但这感觉可能会变得非常棘手。我需要非常注意所有这些期货的来源。同样从设计的角度来看,如果我 return 一个未来,我如何防止调用者使用我的执行者?感觉就像它打破了封装。我知道 Scala 中的所有转换函数都采用隐式 ExecutionContext 似乎可以解决所有这些问题。

附带问题:如果您将中间 CompletionStage 分配给一个变量并在其上调用一个方法,它将在同一线程上执行。

主要问题:只有第一个,所以将 thenAccept 更改为 thenAcceptAsync -- 以下所有步骤都将在用于接受的线程上执行它们的步骤。

替代问题:从 thenCompose 完成未来的线程与用于撰写的线程相同。

您应该将 CompletionStages 视为在同一线程上快速连续执行的步骤(通过按顺序应用函数),除非您特别希望该步骤在不同的线程上执行,使用异步.然后所有后续步骤都在该新线程上完成。

在您当前的设置中,步骤将像这样执行:

Thread-1: delete, accept, compose, complete

有了第一个accept async,就变成了:

Thread-1: delete
Thread-2: accept, compose, complete

关于你的最后一个问题,如果你的调用者添加了额外的步骤,他们会使用同一个线程——我认为除了不返回 CompletableFuture 之外,你无能为力,但是正常的 Future.

根据我在玩弄它时的经验观察,执行这些非异步方法的线程将取决于哪个先发生,thenCompose 本身或 Future 背后的任务。

如果 thenCompose 首先完成(在您的情况下,这几乎是肯定的),那么该方法将 运行 在执行 Future 任务的同一线程上。

如果您 Future 背后的任务先完成,那么该方法将 运行 立即在调用线程上(即根本没有执行程序)。