如何正确实现运行多次迭代并等待所有任务完成并在任务完成后成功终止的执行器

How to correctly implement executor that runs multiple iterations and waits for all tasks to complete and successfully terminates after tasks are done

切入正题--------------------

可在此处找到证明已接受答案的代码:

完整示例:

https://github.com/NACHC-CAD/thread-example/tree/shutdown-first

实施:

https://github.com/NACHC-CAD/thread-example/blob/shutdown-first/src/main/java/com/nachc/examples/threadexample/WidgetFactory.java

原文Post-------------------------------- ---

有很多使用Java线程和执行器的例子: https://www.baeldung.com/thread-pool-java-and-guava

https://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html

https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-example/

https://jenkov.com/tutorials/java-concurrency/thread-pools.html

https://xperti.io/blogs/thread-pools-java-introduction/

https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice

https://stackify.com/java-thread-pools/

但是,我无法成功编写执行所有任务、等待任务完成然后正确终止的示例。

从这个例子开始:https://howtodoinjava.com/java/multi-threading/java-thread-pool-executor-example/

代码仅调用 executor.shutdown()。如果线程消耗任何时间,这不允许线程有时间完成。

我在这里创建了一个完整的最简单示例:https://github.com/NACHC-CAD/thread-example/tree/await-termination

shutdown only 分支涵盖了这个用例(https://github.com/NACHC-CAD/thread-example/tree/shutdown-only):

public void makeWidgets() {
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(batchSize);
    log.info("Building " + howMany + " widgets...");
    for (int i = 0; i < howMany; i++) {
        Widget widget = new Widget(lotNumber, i);
        WidgetRunnable runnable = new WidgetRunnable(widget);
        executor.execute(runnable);
    }
    log.info("SHUTTING DOWN----------------");
    executor.shutdown();
}

这段代码给出了以下输出(应该创建了 1000 个小部件,它们应该在等待 1 秒后报告它们已完成)。

2022-04-23 21:27:05,796 21:27:05.796 [main] INFO  (WidgetFactoryIntegrationTest.java:12) - Starting test...
2022-04-23 21:27:05,799 21:27:05.799 [main] INFO  (WidgetFactory.java:29) - Building 100 widgets...
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-2] INFO  (Widget.java:24) - Starting build: 1/1
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-4] INFO  (Widget.java:24) - Starting build: 1/3
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-1] INFO  (Widget.java:24) - Starting build: 1/0
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-5] INFO  (Widget.java:24) - Starting build: 1/4
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-6] INFO  (Widget.java:24) - Starting build: 1/5
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-7] INFO  (Widget.java:24) - Starting build: 1/6
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-8] INFO  (Widget.java:24) - Starting build: 1/7
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-10] INFO  (Widget.java:24) - Starting build: 1/9
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-9] INFO  (Widget.java:24) - Starting build: 1/8
2022-04-23 21:27:05,801 21:27:05.801 [main] INFO  (WidgetFactory.java:35) - SHUTTING DOWN----------------
2022-04-23 21:27:05,800 21:27:05.800 [pool-1-thread-3] INFO  (Widget.java:24) - Starting build: 1/2
2022-04-23 21:27:05,801 21:27:05.801 [main] INFO  (WidgetFactoryIntegrationTest.java:18) - Done.

如果我添加 executor.awaitTermination,代码将运行所有线程但永远不会终止。这个例子在 await-termination 分支中:https://github.com/NACHC-CAD/thread-example/tree/await-termination

public void makeWidgets() {
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(batchSize);
    log.info("Building " + howMany + " widgets...");
    for (int i = 0; i < howMany; i++) {
        Widget widget = new Widget(lotNumber, i);
        WidgetRunnable runnable = new WidgetRunnable(widget);
        executor.execute(runnable);
    }
    try {
        executor.awaitTermination(1000, TimeUnit.HOURS);
    } catch(Exception exp) {
        throw(new RuntimeException(exp));
    }
    log.info("SHUTTING DOWN----------------");
    executor.shutdown();
}

此代码让所有可运行程序完成但永不退出。如何让所有可运行程序完成并让代码 运行 完成(退出)?

参考 ThreadPoolExecutor 文档。 awaitTermination() 方法说明如下:

Blocks until all tasks have completed execution after a shutdown request

虽然 shutdown() 方法描述读取

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted

这表明 awaitTermination() 调用在 shutdown() 调用后生效。

解决上述问题需要先调用shutdown()再调用awaitTermination()

注意:我没有亲自测试过这个;但是,正如原始 post 的评论中提到的那样,John 有,并且该机制有效

是正确的。这是附加点和一些示例代码。

一方面,不需要声明和强制转换ThreadPoolExecutor directly. Just use the more general ExecutorService

并且使用大小适合您的批处理大小的线程池似乎是不明智的。在当前 Java 中,您通常希望活动线程池少于 CPU 核心数。 (如果 Project Loom and its virtual threads succeeds, but that is not the reality today, though you can try the early-access build,这个计算将会发生根本性的变化。)

int threadPoolSize = 3 ;  // Generally less than number of cores. 
ExecutorService executorService = Executors.newFixedThreadPool( threadPoolSize );

让我们简化您的示例场景。我们将 Widget 定义为简单的 record.

record Widget ( UUID id , Instant whenCreated ) {}

定义一个产生 Widget 的任务。我们想要取回一个 Widget 对象,所以我们使用 Callable 而不是 Runnable.

Callable < Widget > makeWidgetTask = ( ) -> {
    Thread.sleep( Duration.ofMillis( 50 ).toMillis() ); // Pretend that we have a long-running task.
    Widget widget = new Widget( UUID.randomUUID() , Instant.now() );
    return widget;
};

做一个大集合,在运行那个任务中多次使用。

List < Callable < Widget > > tasks = Collections.nCopies( 1_000 , makeWidgetTask );

实际上,我们需要包装在 try-catch.

List < Future < Widget > > futures = null;
try
{
    futures = executorService.invokeAll( tasks );
}
catch ( InterruptedException e )
{
    throw new RuntimeException( e );
}

将所有这些任务提交给执行程序服务。请注意我们如何取回 Future 对象的列表。 Future 是我们处理每个任务完成的成功或失败的句柄。

至于如何等待完成,如何使用ExecutorService#shutdownshutdownNowawaitTermination,只看Javadoc。为您提供了样板代码的完整示例。

引用Java文档:

pool.shutdown(); // Disable new tasks from being submitted
try {
    // Wait a while for existing tasks to terminate
    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
        pool.shutdownNow(); // Cancel currently executing tasks
        // Wait a while for tasks to respond to being cancelled
        if (!pool.awaitTermination(60, TimeUnit.SECONDS))
            System.err.println("Pool did not terminate");
    }
} catch (InterruptedException ex) {
    // (Re-)Cancel if current thread also interrupted
    pool.shutdownNow();
    // Preserve interrupt status
    Thread.currentThread().interrupt();
}

关键概念是 shutdown 不会阻止任何 work-in-progress。当前正在执行的所有任务将继续。当线程可用时,所有提交的任务最终将被安排在核心上执行。 shutdown 方法只做一件事:停止将任何其他任务提交给此执行程序服务。引用 Java 文档:

shutdown() … previously submitted tasks are executed, but no new tasks will be accepted.

进一步引用:

This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that.

所以你需要在调用shutdown之后调用awaitTermination。您在合理的时间内传递参数,您希望所有提交的任务都能完成、取消或中断。如果超过该时间限制,那么您可以认为出了问题。

请注意,对 shutdown 的调用 不会 阻塞,但对 awaitTermination 的调用 会阻塞.

让我们根据自己的示例调整样板代码。

executorService.shutdown(); // Disable new tasks from being submitted.
try
{
    if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
    {
        executorService.shutdownNow(); // Cancel currently executing tasks.
        // Wait a while for tasks to respond to being cancelled.
        if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
        { System.err.println( "Executor service did not terminate." ); }
    }
}
catch ( InterruptedException ex )
{
    executorService.shutdownNow();  // (Re-)Cancel if current thread also interrupted
    Thread.currentThread().interrupt();  // Preserve interrupt status
}

最后,通过检查 Future 个对象的集合来查看我们的结果。

System.out.println( "Count futures: " + futures.size() );
for ( Future < Widget > future : futures )
{
    if ( ! future.isDone() ) { System.out.println( "Oops! Task not done: " + future.toString() ); }
    else if ( future.isCancelled() ) { System.out.println( "Bummer. Task cancelled: " + future.toString() ); }
    else // Else task must have completed successfully.
    {
        try
        {
            Widget widget = future.get();
            System.out.println( widget.toString() );
        }
        catch ( InterruptedException e )
        {
            throw new RuntimeException( e );
        }
        catch ( ExecutionException e )
        {
            throw new RuntimeException( e );
        }
    }
}

在顶部和底部添加一些经过的时间代码。

long start = System.nanoTime();
…
System.out.println( "Elapsed: " + Duration.ofNanos( System.nanoTime() - start ) );

在我的 M1 MacBook Pro 上有 8 个真正的内核,在 Java 18 上,大约需要 18 秒。

Count futures: 1000
Widget[id=56e594bf-75a6-4cf1-83fc-2b671873c534, whenCreated=2022-04-25T07:00:18.977719Z]
Widget[id=11373948-0689-467a-9ace-1e8d57f40f40, whenCreated=2022-04-25T07:00:18.977721Z]
…
Widget[id=d3b11574-6c11-41cc-9f26-c24ad53aa18c, whenCreated=2022-04-25T07:00:36.747058Z]
Widget[id=017ff453-da92-4296-992e-2c2a2ac44ed8, whenCreated=2022-04-25T07:00:36.748571Z]
Elapsed: PT17.906065583S

完整的示例代码,为您 copy-paste 提供方便。

package work.basil.example.threading;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

public class App
{
    public static void main ( String[] args )
    {
        long start = System.nanoTime();

        int threadPoolSize = 3;  // Generally less than number of cores.
        ExecutorService executorService = Executors.newFixedThreadPool( threadPoolSize );

        record Widget( UUID id , Instant whenCreated )
        {
        }

        Callable < Widget > makeWidgetTask = ( ) -> {
            Thread.sleep( Duration.ofMillis( 50 ).toMillis() ); // Pretend that we have a long-running task.
            Widget widget = new Widget( UUID.randomUUID() , Instant.now() );
            return widget;
        };

        List < Callable < Widget > > tasks = Collections.nCopies( 1_000 , makeWidgetTask );

        List < Future < Widget > > futures = null;
        try
        {
            futures = executorService.invokeAll( tasks );
        }
        catch ( InterruptedException e )
        {
            throw new RuntimeException( e );
        }

        executorService.shutdown(); // Disable new tasks from being submitted.
        try
        {
            if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
            {
                executorService.shutdownNow(); // Cancel currently executing tasks.
                // Wait a while for tasks to respond to being cancelled.
                if ( ! executorService.awaitTermination( 60 , TimeUnit.SECONDS ) )
                { System.err.println( "Executor service did not terminate." ); }
            }
        }
        catch ( InterruptedException ex )
        {
            executorService.shutdownNow();  // (Re-)Cancel if current thread also interrupted
            Thread.currentThread().interrupt();  // Preserve interrupt status
        }

        System.out.println( "Count futures: " + futures.size() );
        for ( Future < Widget > future : futures )
        {
            if ( ! future.isDone() ) { System.out.println( "Oops! Task not done: " + future.toString() ); }
            else if ( future.isCancelled() ) { System.out.println( "Bummer. Task cancelled: " + future.toString() ); }
            else // Else task must have completed successfully.
            {
                try
                {
                    Widget widget = future.get();
                    System.out.println( widget.toString() );
                }
                catch ( InterruptedException e )
                {
                    throw new RuntimeException( e );
                }
                catch ( ExecutionException e )
                {
                    throw new RuntimeException( e );
                }
            }
        }

        System.out.println( "Elapsed: " + Duration.ofNanos( System.nanoTime() - start ) );
    }
}