使用 ThreadPoolExecutor 的同步任务 producer/consumer

Synchronous task producer/consumer using ThreadPoolExecutor

我正在尝试找到一种在以下情况下使用 ThreadPoolExecutor 的方法:

为了提供更多上下文,我目前只是一次提交所有任务,并在最大构建时间到期后取消 ExecutorService.submit 返回的所有期货。我忽略所有结果 CancellationExceptions 因为它们是预期的。问题是 Future.cancel(false) 的行为很奇怪并且不适合我的用例:

我查看了 Java 必须提供的不同阻塞队列并发现了这个:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html. That seemed ideal at first, but then looking at https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html,它似乎没有按照我想要的方式与 ThreadPoolExecutor 一起玩:

Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.

理想情况下,消费者(= 池)在 SynchronousQueue.poll 上阻塞,而生产者(= 任务生产者线程)在 SynchronousQueue.put.

上阻塞

知道如何在不编写任何复杂的调度逻辑的情况下实现我描述的场景吗(ThreadPoolExecutor 应该为我包含什么)?

我相信您走在正确的道路上...您所要做的就是将 SynchronousQueueRejectedExecutionHandler 结合使用,并使用以下 constructor 。 .. 这样你就可以定义一个固定的最大线程池(限制你的资源使用)并定义一个回退机制来重新安排那些无法处理的任务(因为池已满)......示例:

public class Experiment {

    public static final long HANDLER_SLEEP_TIME = 4000;
    public static final int MAX_POOL_SIZE = 1;

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Runnable> queue;
        RejectedExecutionHandler handler;
        ThreadPoolExecutor pool;
        Runnable runA, runB;

        queue   = new SynchronousQueue<>();
        handler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    System.out.println("Handler invoked! Thread: " + Thread.currentThread().getName());
                    Thread.sleep(HANDLER_SLEEP_TIME); // this let runnableA finish
                    executor.submit(r);    // re schedule

                } catch (InterruptedException ex) {
                    throw new RuntimeException("Handler Exception!", ex);
                }
            }
        };

        pool = new ThreadPoolExecutor(1, MAX_POOL_SIZE, 10, TimeUnit.SECONDS, queue, handler);
        runA = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("hello, I'm runnable A");

                } catch (Exception ex) {
                    throw new RuntimeException("RunnableA", ex);
                }
            }
        };
        runB = new Runnable() {
            @Override
            public void run() {
                System.out.println("hello, I'm runnable B");
            }
        };

        pool.submit(runA);
        pool.submit(runB);
        pool.shutdown();
    }
}

注意:RejectedExecutionHandler 的实施由您决定!我只是建议将睡眠作为一种阻塞机制,但是你可以做更复杂的逻辑,因为询问线程池是否有空闲线程。如果没有,那就睡觉;如果是,则重新提交任务...

除了@Carlitos Way 提出的选项,我还找到了另一个选项。它包括使用 BlockingQueue.offer 直接在队列中添加任务。一开始我没能让它工作而不得不 post 这个问题的唯一原因是我不知道 ThreadPoolExecutor 的默认行为是在没有任何线程的情况下启动。线程将使用线程工厂延迟创建,并且可能 deleted/repopulated 取决于池的核心和最大大小以及同时提交的任务数。

由于线程创建是惰性的,我尝试阻止对 offer 的调用失败,因为如果没有人等待从队列中获取元素,SynchronousQueue.offer 会立即退出。相反,SynchronousQueue.put 阻塞,直到有人要求从队列中取出一个项目,如果线程池为空,这将永远不会发生。

因此,解决方法是强制线程池使用 ThreadPoolExecutor.prestartAllCoreThreads 急切地创建核心线程。然后我的问题变得相当微不足道。我制作了我的真实用例的简化版本:

import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

public class SimplifiedBuildScheduler {
    private static final int MAX_POOL_SIZE = 10;

    private static final Random random = new Random();
    private static final AtomicLong nextTaskId = new AtomicLong(0);

    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<Runnable> queue = new SynchronousQueue<>();

        // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
        long maxBuildTimeInMillis = 50;
        // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
        long taskSubmissionTimeoutInMillis = 1;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
        pool.prestartAllCoreThreads();

        Runnable nextTask = makeTask(maxBuildTimeInMillis);

        long millisAtStart = System.currentTimeMillis();
        while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
            boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
            if (submitted) {
                nextTask = makeTask(maxBuildTimeInMillis);
            } else {
                System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
                        "the max build time has expired");
            }
        }

        System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");

        pool.shutdown();
        pool.awaitTermination(9999999, SECONDS);
    }

    private static Runnable makeTask(long maxBuildTimeInMillis) {
        long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
        long taskId = nextTaskId.getAndIncrement();
        return () -> {
            try {
                System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
                Thread.sleep(sleepTimeInMillis);
                System.out.println("Task " + taskId + " completed !");
            } catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        };
    }

    private static int randomSleepTime(long maxBuildTimeInMillis) {
        // voluntarily make it possible that a task finishes after the max build time is expired
        return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
    }
}

输出示例如下:

Task 1 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 sleeping for 23 ms
Task 1 sleeping for 26 ms
Task 2 sleeping for 6 ms
Task 3 sleeping for 9 ms
Task 4 sleeping for 75 ms
Task 5 sleeping for 35 ms
Task 6 sleeping for 81 ms
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 7 sleeping for 86 ms
Task 8 sleeping for 47 ms
Task 9 sleeping for 40 ms
Task 11 was not submitted. It will be rescheduled unless the max build time has expired
Task 2 completed !
Task 10 sleeping for 76 ms
Task 12 was not submitted. It will be rescheduled unless the max build time has expired
Task 3 completed !
Task 11 sleeping for 31 ms
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 completed !
Task 12 sleeping for 7 ms
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 1 completed !
Task 13 sleeping for 40 ms
Task 15 was not submitted. It will be rescheduled unless the max build time has expired
Task 12 completed !
Task 14 sleeping for 93 ms
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 5 completed !
Task 15 sleeping for 20 ms
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 11 completed !
Task 16 sleeping for 27 ms
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 9 completed !
Task 17 sleeping for 95 ms
Task 19 was not submitted. It will be rescheduled unless the max build time has expired
Max build time has expired. Stop submitting new tasks and running existing tasks to completion
Task 8 completed !
Task 15 completed !
Task 13 completed !
Task 16 completed !
Task 4 completed !
Task 6 completed !
Task 10 completed !
Task 7 completed !
Task 14 completed !
Task 17 completed !

您会注意到,例如,任务 19 没有被重新安排,因为最大构建时间在调度程序可以尝试第二次将其提供给队列之前到期。您还可以看到在最大构建时间到期之前开始的所有正在进行的任务都完成了 运行。

注意: 正如我在代码中的注释中指出的那样,最大构建时间是 soft 要求,这意味着它可能不能完全满足,我的解决方案确实允许即使在最大构建时间到期后也可以提交任务。如果对 offer 的调用刚好在最大构建时间到期之前开始并在此之后完成,就会发生这种情况。为了减少它发生的几率,重要的是调用 offer 时使用的超时比最大构建时间小得多。在实际系统中,线程池通常处于忙碌状态,没有空闲线程,因此这种竞争条件发生的概率极小,即使发生也不会对系统造成不良后果,因为最大构建时间是尽力满足总体 运行 时间,而不是严格的严格限制。