在保持非阻塞的情况下有序执行许多 CompletableFuture.allof()
Ordered execution of many CompletableFuture.allof() while staying non-blocking
我遇到过这种情况,其中有 10 个或更多任务被分成许多组。在这些组中,所有内容都应该 运行 同时进行,但是因为每个组都需要前一组的结果(第一组除外),所以我需要按顺序 运行 它们(组内的任务不需要按顺序 运行)。
任务本身是从数据库中查询数据,然后应用一些转换并将其保存回数据库。
Task 1.1 // This group run first
Task 1.2
Task 2.1 // Waiting results from group 1
Task 2.2
Task 2.3
Task 3.1 // Waiting results from group 2
我正在考虑使用 allOf()
的列表,迭代它然后为每个 allOf()
显式调用 get()
,但它会阻止我不希望它发生了,所以我的问题是,如何按顺序执行许多 allOf()
?甚至可以在这里只使用 CompletableFuture
吗?
当您使用 allOf()
时,它 returns 一个 CompletableFuture
只有在所有给定的完成阶段都完成后才会完成。
如果您从返回的 future 链接调用,则可以保证在传递给 allOf()
的任何完成阶段对 get()
的调用永远不会阻塞(因为它们已经完成) .
// First group
CompletableFuture<Integer> task11 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> task12 = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Integer> task13 = CompletableFuture.supplyAsync(() -> 1729);
// this one will complete after all tasks from the first group complete
CompletableFuture<Void> allFirstTasks = CompletableFuture.allOf(task11, task12, task13);
// Second group will be child tasks from the first group
CompletableFuture<Integer> task21 = allFirstTasks.thenApply(__ ->
task11.join() + task12.join() + task13.join() // will not block
);
注意:使用 join()
而不是 get()
以避免处理已检查的异常。
我遇到过这种情况,其中有 10 个或更多任务被分成许多组。在这些组中,所有内容都应该 运行 同时进行,但是因为每个组都需要前一组的结果(第一组除外),所以我需要按顺序 运行 它们(组内的任务不需要按顺序 运行)。
任务本身是从数据库中查询数据,然后应用一些转换并将其保存回数据库。
Task 1.1 // This group run first
Task 1.2
Task 2.1 // Waiting results from group 1
Task 2.2
Task 2.3
Task 3.1 // Waiting results from group 2
我正在考虑使用 allOf()
的列表,迭代它然后为每个 allOf()
显式调用 get()
,但它会阻止我不希望它发生了,所以我的问题是,如何按顺序执行许多 allOf()
?甚至可以在这里只使用 CompletableFuture
吗?
当您使用 allOf()
时,它 returns 一个 CompletableFuture
只有在所有给定的完成阶段都完成后才会完成。
如果您从返回的 future 链接调用,则可以保证在传递给 allOf()
的任何完成阶段对 get()
的调用永远不会阻塞(因为它们已经完成) .
// First group
CompletableFuture<Integer> task11 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> task12 = CompletableFuture.supplyAsync(() -> 42);
CompletableFuture<Integer> task13 = CompletableFuture.supplyAsync(() -> 1729);
// this one will complete after all tasks from the first group complete
CompletableFuture<Void> allFirstTasks = CompletableFuture.allOf(task11, task12, task13);
// Second group will be child tasks from the first group
CompletableFuture<Integer> task21 = allFirstTasks.thenApply(__ ->
task11.join() + task12.join() + task13.join() // will not block
);
注意:使用 join()
而不是 get()
以避免处理已检查的异常。