是否有带多个队列的开箱即用的线程池(确保每个队列的串行处理)?

Is there an out-of-the-box thread pool with multiple queues (that ensure serial processing of each queue)?

在我所有的任务中,我有一些必须顺序处理(它们永远不能 运行 同时处理,它们必须按顺序处理)。

我实现了为每组必须串行执行的任务创建一个单独的线程池,其中包含一个线程。它有效,但我没有相应的资源。我不控制组的数量,所以我最终可能会同时处理大量的线程 运行ning。

有什么方法可以用单个线程池完成吗?是否有一个具有多个阻塞队列的线程池,我可以确保每个队列的串行执行?

编辑:

只是强调我在第二段中所说的内容:我已经为必须连续执行的每组任务使用单线程线程池解决了这个问题。不过,我无法继续使用此解决方案。群太多了,我发不完。

我找到了这个相关问题,但由于不是最近,我还是创建了我的问题。我所做的只是试图避免重新发明轮子,但我似乎别无选择。

Does Java have an indexable multi-queue thread pool?

查看 Java 的内置线程执行器服务。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

有一个单线程执行器将同步处理每个任务。

回复评论区:

请先阅读 API,然后再说这行不通。
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() 创建一个 Executor,它使用单个工作线程在无界队列中运行。 (但是请注意,如果此单个线程在关闭之前的执行过程中因故障而终止,则如果需要执行后续任务,一个新线程将取代它。)保证任务按顺序执行,并且不会有多个任务处于活动状态在任何给定时间。与其他等效的 newFixedThreadPool(1) 不同,返回的执行程序保证不能重新配置以使用其他线程。

注意:是保证按顺序执行的状态。

编辑:

现在我更好地理解了你的问题,我有一个你可以尝试的想法。如果为每个组维护一个队列,则可以从每个队列中提取项目并将它们送入线程池。下面的代码不会优先考虑任何一个组,它只是以轮抢的方式拉动它们。如果您需要添加优先级,您应该很容易就能做到。下面的代码将使用两个线程(加上管理队列的线程)轮抢 4 个组。您可以使用另一种队列机制。我通常将 LinkedBlockingQueue 用于我想等待另一个线程将项目放入队列的情况,这可能不是您想要的 - 这就是我轮询而不是调用 take() 的原因。 Take 是等待的调用。

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

如果该组的任务未完成,它将跳到下一组。一次不会处理超过两个组,并且任何一个组都不会 运行 超过一项任务。队列将强制执行有序执行。

单线程执行器就可以了

ExecutorService  executorService = Executors.newSingleThreadExecutor();

它在内部使用带有 LinkedBlockingQueue

的 ThreadPoolExecutor
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()))

所以您可以将它用于您的连续任务,并可能使用 multi-threaded 执行器服务来处理并发任务

如果您为每个组维护一个队列,则可以从每个队列中提取项目并将它们送入线程池。下面的代码不会优先考虑任何一组,它只是以循环方式拉动它们。如果您需要添加优先级,您应该很容易就能做到。以下代码将使用两个线程(加上管理队列的线程)循环 4 个组。您可以使用另一种队列机制。我通常将 LinkedBlockingQueue 用于我想等待另一个线程将项目放入队列的情况,这可能不是您想要的 - 所以我正在轮询而不是调用 take()。 Take 是等待的调用。

private Future group1Future = null;
private Future group2Future = null;
private Future group3Future = null;
private Future group4Future = null;
private LinkedBlockingQueue<Callable> group1Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group2Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group3Queue
        = new LinkedBlockingQueue<>();
private LinkedBlockingQueue<Callable> group4Queue
        = new LinkedBlockingQueue<>();

private ExecutorService executor = Executors.newFixedThreadPool(2);


public void startProcessing() {
    while (true) {
        if (group1Future != null && group1Future.isDone()) {
            if (group1Queue.peek() != null) {
                group1Future = executor.submit(group1Queue.poll());
            }
        }
        if (group2Future != null && group1Future.isDone()) {
            if (group2Queue.peek() != null) {
                group2Future = executor.submit(group2Queue.poll());
            }
        }
        if (group3Future != null && group3Future.isDone()) {
            if (group3Queue.peek() != null) {
                group3Future = executor.submit(group3Queue.poll());
            }
        }

        if (group4Future != null && group4Future.isDone()) {
            if (group4Queue.peek() != null) {
                group4Future = executor.submit(group4Queue.poll());
            }
        }
    }
}

如果该组的任务未完成,它将跳到下一组。一次不会处理超过两个组,并且任何一个组都不会 运行 超过一项任务。队列将强制执行有序执行。

你需要的不是一个特殊的执行器,而是表达任务之间依赖关系的手段。代替必须连续执行的一组任务,考虑一个任务,该任务在执行结束时向下一个任务发送信号,从而开始执行。因此,您的任务可以编码为等待允许信号开始执行的参与者。考虑 Akka 或任何其他 actor 库(例如我的 df4j)。

Akka, as suggested by @SotiriosDelimanolis and @AlexeiKaigorodov seems promising, as well as ,肯定能解决问题。唯一的缺点是我必须编写自己的轮询策略以确保我的任务最终被添加到执行程序(就像他的示例中的无限循环)。

另一方面,@OldCurmudgeon 建议的 Striped Executor Service 完全符合我的问题,开箱即用,就像自定义 ExecutorService

This magical thread pool would ensure that all Runnables with the same stripeClass would be executed in the order they were submitted, but StripedRunners with different stripedClasses could still execute independently. He wanted to use a relatively small thread pool to service a large number of Java NIO clients, but in such a way that the runnables would still be executed in-order.

甚至还有关于为每个组(条带)使用单线程线程池的评论,正如此处所建议的那样:

Several suggestions were made, such as having a SingleThreadExecutor for each stripeClass. However, that would not satisfy the requirement that we could share the threads between connections.

我认为这是简单易用的最佳解决方案。

我最近回答了一个关于 "serial task queue" 的问题,其中包含一个基本实现作为演示 。我想您一直在使用类似的解决方案。调整实现以使用任务列表映射并仍然共享一个(固定大小)执行程序相对容易。
的 Striped Executor Service 是更好的解决方案,但我在这里展示了经过调整的实现,以演示将任务队列与执行程序解耦。该实现使用回调,因此无需进行轮询或发信号。由于使用了 "critical (stop the world) section",带有任务队列的地图可以自行清理:没有任务排队意味着空地图。 "critical section" 的缺点是吞吐量有限:每秒只能添加和删除这么多任务。

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

// Copied and updated from 
public class SerialTaskQueues {

    public static void main(String[] args) {

        // test the serial task execution using different groups
        ExecutorService executor = Executors.newFixedThreadPool(2);
        SerialTaskQueues tq = new SerialTaskQueues(executor);
        try {
            // test running the tasks one by one
            tq.add(new SleepSome("1", 30L));
            Thread.sleep(5L);
            tq.add(new SleepSome("2", 20L));
            tq.add(new SleepSome("1", 10L));

            Thread.sleep(100L);
            // all queues should be empty
            System.out.println("Queue size 1: " + tq.size("1")); // should be empty
            System.out.println("Queue size 2: " + tq.size("2")); // should be empty
            tq.add(new SleepSome("1", 10L));
            tq.add(new SleepSome("2", 20L));
            // with executor pool size set to 2, task 3 will have to wait for task 1 to complete
            tq.add(new SleepSome("3", 30L));
            tq.add(new SleepSome("1", 20L));
            tq.add(new SleepSome("2", 10L));

            Thread.sleep(100L);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdownNow();
        }
    }

    // all lookups and modifications to the list must be synchronized on the list.
    private final Map<String, GroupTasks> taskGroups = new HashMap<>();
    // make lock fair so that adding and removing tasks is balanced.
    private final ReentrantLock lock = new ReentrantLock(true);
    private final ExecutorService executor;

    public SerialTaskQueues(ExecutorService executor) {
        this.executor = executor;
    }

    public boolean add(String groupId, Runnable task) {

        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt == null) {
                gt = new GroupTasks(groupId);
                taskGroups.put(groupId, gt);
            }
            gt.tasks.add(task); 
        } finally {
            lock.unlock();
        }
        runNextTask(groupId);
        return true;
    }

    /* Utility method for testing. */
    public void add(SleepSome sleepTask) {
        add(sleepTask.groupId, sleepTask);
    }

    private void runNextTask(String groupId) {

        // critical section that ensures one task is executed.
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            if (gt.tasks.isEmpty()) {
                // only cleanup when last task has executed, prevent memory leak
                if (!gt.taskRunning.get()) {
                    taskGroups.remove(groupId);
                }
            } else if (!executor.isShutdown() && gt.taskRunning.compareAndSet(false, true)) {
                executor.execute(wrapTask(groupId, gt.taskRunning, gt.tasks.remove(0)));
            }
        } finally {
            lock.unlock();
        }
    }

    private CallbackTask wrapTask(final String groupId, final AtomicBoolean taskRunning, Runnable task) {

        return new CallbackTask(task, new Runnable() {
            @Override 
            public void run() {
                if (!taskRunning.compareAndSet(true, false)) {
                    System.out.println("ERROR: programming error, the callback should always run in execute state.");
                }
                runNextTask(groupId);
            }
        });
    }

    /** Amount of (active) task groups. */
    public int size() {

        int size = 0;
        lock.lock();
        try {
            size = taskGroups.size();
        } finally {
            lock.unlock();
        }
        return size;
    }

    public int size(String groupId) {

        int size = 0;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            size = (gt == null ? 0 : gt.tasks.size());
        } finally {
            lock.unlock();
        }
        return size;
    }

    public Runnable get(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r =  (gt == null ? null : gt.tasks.get(index));
        } finally {
            lock.unlock();
        }
        return r;
    }

    public Runnable remove(String groupId, int index) {

        Runnable r = null;
        lock.lock();
        try {
            GroupTasks gt = taskGroups.get(groupId);
            r = gt.tasks.remove(index);
            // similar to runNextTask - cleanup if there are no tasks (running) for the group 
            if (gt.tasks.isEmpty() && !gt.taskRunning.get()) {
                taskGroups.remove(groupId);
            }
        } finally {
            lock.unlock();
        }
        return r;
    }

    /* Helper class for the task-group map. */
    class GroupTasks {

        final List<Runnable> tasks = new LinkedList<Runnable>();
        // atomic boolean used to ensure only 1 task is executed at any given time
        final AtomicBoolean taskRunning = new AtomicBoolean(false);
        final String groupId;

        GroupTasks(String groupId) {
            this.groupId = groupId;
        }
    }

    // general callback-task, see 
    static class CallbackTask implements Runnable {

        private final Runnable task, callback;

        public CallbackTask(Runnable task, Runnable callback) {
            this.task = task;
            this.callback = callback;
        }

        @Override 
        public void run() {

            try {
                task.run();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    callback.run();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // task that just sleeps for a while
    static class SleepSome implements Runnable {

        static long startTime = System.currentTimeMillis();

        private final String groupId;
        private final long sleepTimeMs;
        public SleepSome(String groupId, long sleepTimeMs) {
            this.groupId = groupId;
            this.sleepTimeMs = sleepTimeMs;
        }
        @Override public void run() {
            try { 
                System.out.println(tdelta(groupId) + "Sleeping for " + sleepTimeMs + " ms.");
                Thread.sleep(sleepTimeMs);
                System.out.println(tdelta(groupId) + "Slept for " + sleepTimeMs + " ms.");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private String tdelta(String groupId) { return String.format("% 4d [%s] ", (System.currentTimeMillis() - startTime), groupId); }
    }
}

没有满足这些要求的线程池的标准实现。

接受的答案中提到的 Striped Executor Service 是一个很好的替代品。

我看到的缺点是:多个队列(无法限制队列容量,或维护提交顺序),每个条带线程(如果你有很多条带,你的线程池会增长)。

我决定用单个队列创建类似的实现:
GitHub - TaggedThreadPoolExecutor.java
它实现了标准的ExecutorService接口,维护单队列,以最大线程数为参数,支持不同的拒绝策略(类似于标准的ThreadPoolExecutor),不像ThreadPoolExecutor,它不是在队列已满时启动新线程,而是在提交新任务时启动新线程。