哪种线程机制用于将其他任务排入队列的任务?

Which threading mechanism to use for tasks that enqueue other tasks?

我正在使用创建其他任务的任务。这些任务可能会或可能不会创建后续任务。我事先不知道总共会创建多少个任务。在某个时候,不再创建任何任务,所有任务都将完成。

当最后一个任务完成后,我必须做一些额外的事情。

应该使用哪种线程机制?我读过 CountDownLatch, Cyclic Barrier and Phaser,但 none 似乎合适。

我也尝试过使用ExecutorService,但是我遇到了一些问题,比如最后无法执行某些东西,你可以在下面看到我的尝试:

import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Issue {

  public static void main(String[] args) throws InterruptedException {
    var count = new AtomicInteger(1);
    var executor = Executors.newFixedThreadPool(3);
    class Task implements Runnable {
      final int id = count.getAndIncrement();
      @Override
      public void run() {
        try {
          MILLISECONDS.sleep((long)(Math.random() * 1000L + 1000L));
        } catch (InterruptedException e) {
          // Do nothing
        }
        if (id < 5) {
          executor.submit(new Task());
          executor.submit(new Task());
        }
        System.out.println(id);
      }
    }
    executor.execute(new Task());
    executor.shutdown();
//    executor.awaitTermination(20, TimeUnit.SECONDS);
    System.out.println("Hello");
  }
}

这会输出异常,因为任务是在调用 shutdown() 之后添加的,但预期输出类似于:

1
2
3
4
5
6
7
8
9
Hello

哪种线程机制可以帮助我做到这一点?

这似乎很棘手。如果甚至有一个任务在队列中或当前正在执行,那么由于您不能说它是否会产生另一个任务,因此您无法知道它可能 运行 多长时间。这可能是需要另外 2 小时的一系列任务的开始。

我认为您实现此目标所需的所有信息都已封装在执行程序实现中。您需要知道 运行ning 和队列中的内容。

我认为您很遗憾地考虑必须编写自己的执行程序。它不需要很复杂,如果你不想的话,它也不必符合 JDK 的接口。只是维护线程池和任务队列的东西。添加将侦听器附加到执行程序的功能。当队列为空并且没有正在执行的任务时,您可以通知监听器。

这是一个快速代码草图。

class MyExecutor
{
    private final AtomicLong taskId = new AtomicLong();
    private final Map<Long, Runnable> idToQueuedTask = new ConcurrentHashMap<>();
    private final AtomicLong runningTasks  = new AtomicLong();
    private final ExecutorService delegate = Executors.newFixedThreadPool(3);

    public void submit(Runnable task) {
        long id = taskId.incrementAndGet();
        final Runnable wrapped = () -> {
            taskStarted(id);
            try {
                task.run();
            }
            finally {
                taskEnded();
            }
        };
        idToQueuedTask.put(id, wrapped);
        delegate.submit(wrapped);
    }
    
    private void taskStarted(long id) {
        idToQueuedTask.remove(id);
        runningTasks.incrementAndGet();
    }
    
    private void taskEnded() {
        final long numRunning = runningTasks.decrementAndGet();
        if (numRunning == 0 && idToQueuedTask.isEmpty()) {
            System.out.println("Done, time to notify listeners");
        }
    }
    
    public static void main(String[] args) {
        MyExecutor executor = new MyExecutor();
        executor.submit(() -> {
            System.out.println("Parent task");
            
            try {
                Thread.sleep(1000);
            }
            catch (Exception e) {}
            
            executor.submit(() -> {
                System.out.println("Child task");
            });
        });
    }
}

如果您将 ExecutorService 更改为:

ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

然后您可以使用计数函数等待:

    while(executor.getTaskCount() > executor.getCompletedTaskCount())
    {
        TimeUnit.SECONDS.sleep(10L);
    }
    executor.shutdown();
    System.out.println("Hello");