CompletableFuture 从内部注入

CompletableFuture injection from the inside



public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // ... Some processing here ...
        if (somecondition failed)
            return false; // Task failed!

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        if (!some condition satisfied) {
            // This is where I want the injection to happen. 
            // This stage should be suspended and a new stage should be injected between this point and the next stage.

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        return true; // OK

    // This is the result we have to wait for.
    return future;

在注入点 if (!some condition satisfied),我想 运行 一个查询(比方说)需要 5 秒来执行并检索最后阶段所需的一些数据。我不想阻塞线程 5 秒,例如在 if 中使查询同步,我希望它 运行 异步,当结果返回时直接进入下一阶段。我遇到的问题是条件仅在 内部 链中已知。




关键是在注入点我想发布类似的东西(抱歉,Datastax Java Cassandra 驱动程序 代码片段):

ResultSetFuture rsFuture = session.executeAsync(query);

并将那个未来注入链中。这将使调用线程 "free" 执行其他事情,而不是坐下来等待结果。



for (int i = 0; i < 1000; i++) {

此循环仅存在于主线程中,但每次调用函数 都会在线程池 P 中排队 一个新任务。现在假设 P 是一个大小为 1 的固定线程池。也就是说P中只有一个线程,只能处理1个任务。然而,主循环会将所有 1000 个任务排入队列。然后主循环将需要等待所有任务完成。

现在假设 1st 任务中的 1000 任务需要执行长时间的数据库查询。我们现在有两个选择:

  1. 查询在sync内部处理线程(属于线程池 P).这意味着我只需在 if (!some condition satisfied) 块内发出查询并等待结果。这有效地阻止 任务处理,因为线程池P 没有空闲 线程。唯一一个是 busy blocked on IO.

  2. 查询在async内部处理线程中执行(属于线程池 P)。这意味着我在 if (!some condition satisfied) 块内发出查询并立即返回我将收听的未来(可能 DB 驱动程序将产生另一个线程并阻塞 that 线程等待结果)。但是,属于 P 的线程现在 空闲 可以处理 至少 另一个任务。

在我看来,选项 2 优于选项 1,同样的推理也适用于大小为 > 的线程池1 或动态大小。



而不是使用 thenApplyAsync,而是使用 thenComposethenComposeAsync,这使得函数 return 成为 CompletableFuture<Foo> 而不是 Foo .而不是 return true 如果 some condition 满足,你需要 return CompletableFuture.completedFuture(true).

public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // ... Some processing here ...
        if (somecondition failed)
            return false; // Task failed!

        return true; // OK
    }).thenComposeAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return CompletableFuture.completedFuture(false);

        // ... Some processing here ...

        if (!some condition satisfied) {
            return runSomeOtherQuery()

        return CompletableFuture.completedFuture(true); // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // ... Some processing here ...

        return true; // OK

    // This is the result we have to wait for.
    return future;

public CompletableFuture<Boolean> runSomeOtherQuery() {




public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // First stage processing here ...
        if (somecondition failed)
            return false; // Task failed!

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // Second stage processing here ...

        if (!some condition satisfied) {
            // This is where I want the injection to happen. 
            // This stage should be suspended and a new stage should be
            // injected between this point and the next stage.

        return true; // OK
    }).thenApplyAsync((Boolean result) -> {
        if (!result) // check of previous stage fail
            return false;

        // Third stage processing here ...

        return true; // OK

    // This is the result we have to wait for.
    return future;


public CompletableFuture<Boolean> getFutureOfMyLongRunningTask() {
    CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> {
        // First stage processing here ...
        if (somecondition failed)
            return false; // Task failed!
        // Second stage processing here ...

        if (!some condition satisfied) {
            // alternative "injected" stage processing
            if(injected stage failed)
                return false;
        // Third stage processing here ...

        return true; // OK

    // This is the result we have to wait for.
    return future;
