如果 ES 上的项目可以重新提交给 ES,我怎么知道 ExecutorService 何时完成

How do I know when ExecutorService has finished if items on the ES can resubmit to the ES

我的 Java 应用程序处理文件夹中的音乐文件,它旨在并行且独立地处理多个文件夹。为此,每个文件夹都由 ExecutorService 处理,该服务的最大池大小与计算机的 CPU 相匹配。

例如,如果我们有8-CPU台电脑那么可以(理论上)同时处理8个文件夹,如果我们有16-CPU台电脑那么可以同时处理16个文件夹.如果我们只有 1 个 CPU,那么我们将 pool-size 设置为 3,以允许 CPU 在一个文件夹在 I/O.

上被阻止时继续执行某些操作

但是,我们实际上并没有只有一个 ExecutorService,我们有多个,因为每个文件夹都可以经历多个阶段。

Process1(使用ExecutorService1)→ Process2(ExecutorService2)→ Process3(ExecutorService3)

进程 1、2、3 等都实现了 Callable,并且都有自己关联的 ExecutorService。我们启动了一个 FileLoader 进程,它加载文件夹,然后为每个文件夹创建一个 Process1 可调用对象并提交给 Process1 执行程序,对于每个 Process1 可调用对象,它将完成其工作,然后提交给不同的可调用对象,这可能是 Process2、Process3等等,但我们永远不会倒退,例如 Process3 永远不会提交给 Process1。 我们实际上有 12 个进程,但任何特定文件夹都不太可能经历所有 12 个进程

但我意识到这是有缺陷的,因为在 16-CPU 计算机的情况下,每个 ES 的池大小可以为 16,所以我们实际上有 48 个线程 运行,这将只是引起了太多争论。

所以我要做的是让所有进程(Process1、Process2...)使用相同的 ExecutorService,这样我们就只有匹配 CPUs 的工作线程。

但是,在我目前的情况下,我们有一个 SongLoader 进程,它只提交了一个任务(加载所有文件夹),然后我们调用 shutdown(),直到所有内容都提交给 Process0,这才会完成,然后 Process0 上的 shutdown() 将不会成功,直到所有内容都发送到 Process1 等等。

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

但是,如果一切都在同一个 ES 上并且 Process1 正在提交给 Process2,这将不再有效,因为当时调用 shutdown() 并不是 Process1 会提交给 Process2 的所有文件夹,因此它会过早关闭.

那么,当该 ES 上的任务可以提交给同一 ES 上的其他任务时,我如何检测何时使用单个 ExecutorService 完成所有工作?

或者有更好的方法吗?

注意,你可能会想他为什么不把Process1,2 & 3的逻辑合并成一个Process。困难在于,虽然我最初是按文件夹对歌曲进行分组的,但有时歌曲会被分成更小的组,并且它们会被分配到不同的进程中,而不一定是同一个进程,实际上总共有 12 个进程。

基于 Sholms 想法的尝试

主线程

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

使用来自 MainAnalyserService

的函数提交新任务的流程代码
public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

它看起来像是在工作,但失败了

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

我现在松了一口气,我不能让一个线程调用 future.get()(等待完成),同时其他线程正在添加到列表中。

不要shutdown() ExecutorService。相反,创建 Callable 个对象并保留它们创建的 Future 个对象。 现在您可以等待 Future 个对象,而不是等待 ExecutorService 个对象。请注意,现在您将不得不分别等待每个未来的对象,但是如果您只需要知道最后一个对象何时完成,那么您也可以按任何给定的顺序迭代它们并调用 get().

任何任务都可以提交更多任务,并且需要确保将其未来对象放入主线程将监视的队列中。

// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...

void submit(Callable<Boolean> c) {
    futures.add(executor.submit(c));
}

现在您的主线程可以开始提交任务并等待所有任务和子任务:

void mainThread() {
    // add some tasks from main thread
   for(int i=0 ; i<N ; ++i){
        Callable<Boolean> callable = new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                ...
            }
        submit(callable);
    }

    Future<Boolean> head = null;
    while((head=futures.poll()) != null){
        try {
            head.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    // At this point, all of your tasks are complete including subtasks.
    executor.shutdown();
    executor.awaitTermination(); // should return almost immediately
}

我同意 Shloim 的观点,您在这里不需要多个 ExecutorService 实例 - 只需一个(根据您可用的 CPU 数量调整大小)就足够了,而且实际上是最佳的。其实我觉得你可能不需要ExecutorService;如果您使用信号完整性的外部机制,一个简单的 Executor 就可以完成这项工作。

我会先构建一个 class 来表示整个较大的工作项。如果你需要消费每个子工作项的结果,你可以使用队列,但如果你只想知道是否还有工作要做,你只需要一个计数器。

例如,您可以这样做:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

如果您担心 pendingItems 计数器的同步开销,您可以改用 AtomicInteger。然后你需要一个单独的机制来通知等待线程我们已经完成了;例如,您可以使用 CountDownLatch。这是一个示例实现:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

你可以这样称呼它:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

此示例仅显示一级子工作项,但您可以使用任意数量的子工作项,只要它们都使用相同的 pendingItems 计数器来跟踪剩余的工作量做。

这基本上是@DanielPrydens 的解决方案,但我对其进行了一些修改,以便更清楚地说明如何解决我的特定问题

创建了一个新的 class MainAnalyserService 来处理 ExecutorService 的创建并提供在新的可调用任务时进行计数的能力提交并完成时

public class MainAnalyserService 
{
    public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
    protected static int BOUNDED_QUEUE_SIZE = 100;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    private static final int TIMEOUT_PER_TASK = 30;

    protected  ExecutorService      executorService;

    protected String threadGroup;

    public MainAnalyserService(String threadGroup)
    {
       this.threadGroup=threadGroup;
       initExecutorService();
    }

    protected void initExecutorService()
    {
        int workerSize = Runtime.getRuntime().availableProcessors();
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES);
    }

    public void submit(Callable<Boolean> task) //throws Exception
    {
        executorService.submit(task);
        pendingItems.incrementAndGet();
    }

    public void workDone()
    {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0)
        {
            latch.countDown();
        }
    }

    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

FixSongsController 线程中我们有

analyserService = new MainAnalyserService(THREAD_WORKER);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();

//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();

然后任何 Callable(例如 Process1、Process2 等)调用 submit() 提交新的 Callable ExecutorService 上,然后它必须在完成时调用 workDone(),所以为了确保我这样做,我在 call() 中添加了一个 finally 块每个过程 class 方法

例如

public Boolean call() 
{
    try
    {
        //do stuff
        //Possibly make multiple calls to                      
        FixSongsController.getAnalyserService().submit();
    }
    finally
    {
        FixSongsController.getAnalyserService().workDone();
    }
}