Java 执行者服务:等待所有任务完成

Java executor service: Waiting for all tasks to finish

我正在尝试在我的程序中引入并发。程序结构是这样的:

    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
    List<String> initialData = dao.fetchFromDB("input");
    Queue queue = new MyQueue();
    queue.add(initialData);

    while(queue.length() > 0) {
        int startingLength = queue.length();
        for (int i = 0; i < startingLength; i++) {
            String input = queue.remove();
            if(input.equals("some value")) {
              missionAccomplished = true;
              break;
            } else {
                  MyRunnable task = new MyRunnable(input, queue, dao);
                  executor.execute(task);
            }
            
        }
        if(missionAccomplished) {
           break;
        } 
        executor.shutdown();
    }
 

所以队列中包含了需要一个一个处理的数据。在 while 循环中,我 运行 一个 for 循环,它从队列中一个接一个地依次选取数据并对其执行一些检查,如果检查失败,我将使用这些数据创建一个 运行nable 任务并将其移交给executor(因为DB操作比较耗时,所以想用并行)。 for 循环在 while 的给定迭代中仅选择特定长度的数据。 我想要实现的是 'while' 循环仅在当前迭代中提交给执行程序的所有任务都完成时才进入下一次迭代。

如何实现?

尝试使用 Project Loom

中的资源

您问的是:

What I want to achieve is that 'while' loop goes to next iteration only when all tasks submitted to executor in current iteration are finished.

Project Loom 承诺会让这更简单。

Loom 项目带来的变化之一是 ExecutorService interface is a sub-interface of AutoCloseable. This means we can use try-with-resources 语法。 try-with-resources 会自动阻塞,直到所有提交的任务都完成/failed/canceled — 正是您所要求的。

此外,执行器服务在退出try时自动关闭。这些更改意味着您的代码变得更简单、更清晰。

此外,对于经常阻塞的代码,例如数据库访问,您会发现使用虚拟线程(a.k.a。fibers)性能显着提高.虚拟线程是 Project Loom 的另一个新特性。要获得此功能,请调用 Executors.newVirtualThreadExecutor.

Loom 项目的实验性构建 available now,基于抢先体验 Java 17. Loom 团队正在寻求反馈。有关详细信息,请参阅 Oracle 的 Ron Pressler 最近的演讲和访谈。

System.out.println( "INFO - executor service about to start. " + Instant.now() );
try (
        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    for ( int i = 0 ; i < 7 ; i++ )
    {
        executorService.submit( ( ) -> System.out.println( Instant.now() ) );
    }
}
// Notice that when reaching this point we block until all submitted tasks still running are fin
// because that is the new behavior of `ExecutorService` being `AutoCloseable`.
System.out.println( "INFO - executor service shut down at this point. " + Instant.now() );

当运行.

INFO - executor service about to start. 2021-02-08T06:27:03.500093Z
2021-02-08T06:27:03.554440Z
2021-02-08T06:27:03.554517Z
2021-02-08T06:27:03.554682Z
2021-02-08T06:27:03.554837Z
2021-02-08T06:27:03.555015Z
2021-02-08T06:27:03.555073Z
2021-02-08T06:27:03.556675Z
INFO - executor service shut down at this point. 2021-02-08T06:27:03.560723Z