队列并行处理的停止条件,其中任务可能产生更多任务

stopping condition for parallel processing of queue, where tasks may generate more tasks

我正在使用 PriorityBlockingQueue<T> q 来处理任务的并行处理。队列由一些任务初始化,每个任务的处理可能会产生更多的任务,这些任务将被添加到队列中。我想并行处理任务,并在处理完所有任务后停止。当然,如果其他线程仍在处理任务,队列可能会在我们完成之前暂时变空。

我的问题是:在 Java 中,什么是好的(当然是正确的,但也是优雅的、惯用的,没有不必要的锁定或等待队列为空时的等待)方法?

备注:

  1. 我正在使用优先级队列,但任务可以按任何顺序处理(按照优先级指定的大致顺序处理任务会有一些好处 - 但我认为这样做是安全的忽略这一点)。

  2. This answer ("use the Task Parallel Library") seems to address the issue for C#, but it seems that this doesn't exist for Java. This 本质上是同一个问题;好像没有完全满意的答案...

  3. 每个任务的处理过程都比较冗长,任务管理的开销多一点也是可以的,如果能让代码更优雅(这也是我乐于拥有的原因)负责从队列中轮询任务并将其分配给工作人员的单个线程)

示例: 作为一个粗略的近似值,我正在尝试使用并行 BFS 来查找动态生成的树的深度。您可以将其视为寻找一种可以在玩游戏时最大化您的奖励的策略,您在游戏中的每一步都会得到一分。每个状态都是一个待处理的任务。您从初始(根)状态开始,计算所有移动(此计算很长,可能会产生数千个移动),并将这些移动达到的状态添加为要探索的任务。

我意识到一个解决方案可能应该允许所有线程递归提交新任务,这让我想到了 this answer

这是该答案的完整版本,它处理二叉树的遍历,通过从某个字符串开始并将其大致减半获得。 为了支持优先级,可以简单地修改 MyTask.run() 以从一些辅助 PriorityBlockingQueue 中弹出字符串,我决定省略它,因为它只会混淆解决方案的本质。

        int nProcessors = Runtime.getRuntime().availableProcessors();
        System.out.println("number of processors found " + nProcessors);
        final ExecutorService taskExecutor = Executors.newFixedThreadPool(nProcessors);

        final Phaser phaser = new Phaser();
        int phase = phaser.getPhase();

        class MyTask implements Runnable {
            private String input;

            public MyTask(String input) {
//        this.es = es;
                this.input = input;
            }

            public void run() {
                System.out.println("running on \"" + this.input + "\" invoked in thread " + Thread.currentThread().getName());

                int l = this.input.length();
                if (! ((l == 0) || (l == 1))) {
                    String beginning = this.input.substring(0, l / 2);
                    String ending = this.input.substring(l / 2);
                    phaser.register();
                    phaser.register();
                    taskExecutor.execute(new MyTask(beginning));
                    taskExecutor.execute(new MyTask(ending));
                }
                phaser.arrive();
            }
        }

        phaser.register();
        taskExecutor.execute(new MyTask("a lengthy string to be processed goes here"));
        System.out.println("MAIN: initial task submitted at " + Thread.currentThread().getName() + " awaiting phaseAdvance");
        phaser.awaitAdvance(phase);
        System.out.println("MAIN: all tasks arrived, ok to shutdown?");
        try {
            taskExecutor.shutdown();
            System.out.println("MAIN: sutting down awaiting termination...");
            taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            System.out.println("MAIN: await termination finished.");
        } catch (InterruptedException e) {
            System.out.println("caught an interrupted excepion " + e);
        }

输出为

number of processors found 4
MAIN: initial task submitted at main awaiting phaseAdvance
running on "a lengthy string to be processed goes here" invoked in thread pool-1-thread-1
running on "a lengthy string to b" invoked in thread pool-1-thread-2
running on "e processed goes here" invoked in thread pool-1-thread-3
running on "e processe" invoked in thread pool-1-thread-3
running on "a lengthy " invoked in thread pool-1-thread-4
running on "string to b" invoked in thread pool-1-thread-3
running on "d goes here" invoked in thread pool-1-thread-3
running on "e pro" invoked in thread pool-1-thread-3
running on "cesse" invoked in thread pool-1-thread-3
.
.
.
running on "b" invoked in thread pool-1-thread-2
running on " " invoked in thread pool-1-thread-1
running on "e" invoked in thread pool-1-thread-4
running on "i" invoked in thread pool-1-thread-1
running on "h" invoked in thread pool-1-thread-2
running on " " invoked in thread pool-1-thread-3
running on "n" invoked in thread pool-1-thread-4
MAIN: all tasks arrived, ok to shutdown?
MAIN: sutting down awaiting termination...
MAIN: await termination finished.