CompletableFuture.allOff 完成,即使其列表中的一个 CompletableFuture 尚未完成

CompletableFuture.allOff completes even if one CompletableFuture in its list is not yet finished

我有 2 个 CompletableFutures。 task2 只应在 task1 完成后开始。然后,我需要等待所有任务完成。在我下面的代码中,程序在 task1 结束后结束。 task2 开始但没有完成。为什么会发生这种情况的任何想法?另外,为什么列表只包含1个条目,而在代码中,我添加了2个?

代码:

public void testFutures () throws Exception {
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
      System.out.println(" task1 start");
      try {
        Thread.sleep(5000L);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      System.out.println(" task1 done");
      return null;

    });

    task1.whenComplete( (x, y) -> {
      CompletableFuture<Void> task2 = CompletableFuture.supplyAsync( () -> {
        System.out.println(" task2 start");
        try {
          Thread.sleep(2000L);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        System.out.println(" task2 done");
        return null;
      });
      futures.add(task2);
    });
    futures.add(task1);
    // wait for the calls to finish
    try {
      CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete( (x, y) -> {
        System.out.println(" all tasks done " + futures.size());
      }).get();

    } catch (Exception e) {
      e.printStackTrace();
    }
  }

输出:

 task1 start
 task1 done
 all tasks done 1
 task2 start

你有两个问题。

首先,您已经创建了关于何时将 task2 添加到您的期货列表的竞争条件。在你执行这一行的时候——

CompletableFuture.allOf(...).get();

—我称之为 终止 getter,列表中只有 task1。通过输出它的大小自己看看:

// wait for the calls to finish
try {
    System.out.println("# of futures: " + futures.size()); // 1

task2 仍然 运行s 最终 ,因为你用 whenComplete() 安排了它。但触发它的不是你的终止 getter。

回想一下,我说过这是一个竞争条件。为了自己演示这一点,在终止 getter 之前添加一个 sleep(),如下所示:

try {
  Thread.sleep(6000L);
} catch (InterruptedException ex) {
  ex.printStackTrace();
}
// wait for the calls to finish
try {
    System.out.println("# of futures: " + futures.size()); // 2

那么你已经给了它足够的时间来添加 task2.

但事情是这样的。 现在是终止getter触发两个任务吗?

还是不行! 这是第二个问题:您几乎总是想使用 thenRun()thenAccept()thenApply() 之一, thenCompose() 方法。这些方法链接你的未来,使每个阶段都依赖于前一个阶段,这样你的终止getter 实际上 等待整个链完成。 whenComplete() 是一种启动完全不相关管道的特殊方法,因此不受终止 get().

的影响

在你的情况下,你想使用 thenRun(),像这样:

    task1.thenRun( ignore -> {

好的,那么我们如何结合所有这些?

public static void testFutures () throws Exception {
    
    CompletableFuture<Void> task1 = CompletableFuture.supplyAsync( () -> {
      System.out.println(" task1 start");
      try {
        Thread.sleep(5000L);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      System.out.println(" task1 done");
      return null;
    });

    CompletableFuture<Void> futuresChain = task1.thenRun( () -> {
      System.out.println(" task2 start");
      try {
        Thread.sleep(2000L);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      System.out.println(" task2 done");
    });
    
    // wait for the calls to finish
    try {
      futuresChain.thenRun( () -> {
        System.out.println(" all tasks done ");
      }).toCompletableFuture().get();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

输出:

 task1 start
 task1 done
 task2 start
 task2 done
 all tasks done 

你看,第一个任务只需要supplyAsync()。您希望在该任务之后按顺序 运行 task2,因此 thenRun() 将为您进行调度(supplyAsync()ing)。所以你也不需要一系列的期货。 allOf() 用于 运行 并行 任务 ,并等待所有任务完成。

让我们先清理你的代码。

让我们定义一个方法来休眠,这样它就不会把水搅浑:

private static void sleep(int seconds) {
    try {
        Thread.sleep(TimeUnit.SECONDS.toMillis(seconds));
    } catch (InterruptedException ex) {
        throw new RuntimeException(ex);
    }
}

那我们就把任务分开,用正确的方法:

private static CompletableFuture<Void> task1() {

    return CompletableFuture.runAsync(() -> {
        System.out.println(" task1 start");
        sleep(5);
        System.out.println(" task1 done");
    });
}

private static CompletableFuture<Void> task2() {
    return CompletableFuture.runAsync(() -> {
        System.out.println(" task2 start");
        sleep(2);
        System.out.println(" task2 done");
    });
}

您需要了解 CompletableFuture 方法的链接已经完全按照您的要求进行,它们 运行 下一个 阶段,在 上一篇一篇已经结束。您可以通过以下方式使您的代码变得更加简单:

public static void main(String[] args) throws Exception {
    testFutures();
}

private static void testFutures() throws Exception {

    CompletableFuture<Void> both = task1().thenCompose(ignoreMe -> task2());
    both.get();
    System.out.println("both done");

}