按顺序执行任务但从池中获取线程的 ExecutorService

ExecutorService that executes tasks sequentially but takes threads from a pool

我正在尝试构建 ExecutorService 的实现,我们称之为 SequentialPooledExecutor,具有以下属性。

  1. SequentialPooledExecutor的所有实例共享同一个线程池

  2. SequentialPooledExecutor 的同一实例的调用按顺序执行。


我目前正在自己​​实施 SequentialPooledExecutor,但我想知道我是否在重新发明轮子。我查看了 ExecutorService 的不同实现,例如 Executors class 提供的实现,但没有找到满足我要求的实现。




假设我有一系列会话,假设有 1000 个(我之前调用执行程序实例的东西)。我可以将任务提交到会话,并且我希望保证提交到同一会话的所有任务都按顺序执行。但是,属于不同会话的任务不应相互依赖。

我想定义一个 ExecutorService 来执行这些任务,但使用有限数量的线程,比方说 200,但要确保任务不会在同一会话中的前一个任务完成之前启动.

我不知道是否有任何现有的东西已经这样做了,或者我是否应该自己实施这样的 ExecutorService

如果您想按顺序执行任务,只需创建一个 ExecutorService with only one thread thanks to Executors.newSingleThreadExecutor().

如果您有不同类型的任务并且只想按顺序执行相同类型的任务,您可以使用相同的 单线程 ExecutorService 相同类型的任务,无需重新发明轮子。

假设您有 1 000 不同类型的任务,您可以使用 200 单线程 ExecutorService,您唯一需要自己实现的是对于给定类型的任务,您始终需要使用相同的单线程 ExecutorService


ExecutorService[] es = // many single threaded executors

public <T> Future<T> submit(String key, Callable<T> calls) {
    int h = Math.abs(key.hashCode() % es.length);
    return es[h].submit(calls);

一般来说,你只需要 2 * N 个线程来保持 N 个核心忙碌,如果你的任务是 CPU 绑定的,那么更多只会增加开销。

@Nicolas 的回答可能是您最好的选择,因为它简单、经过充分测试且高效。


  1. 不要将 "SequentialPooledExecutor" 设为执行器服务,将其设为 "pool" 单线程执行器服务的外观
  2. 让你的 "SequentialPooledExecutor" 实现一个提交方法(接受一个 Runnable / Callable,和一个代表 "queue name" 的字符串),returns 一个 Future,就像一个执行服务
  3. 调用此方法时,通过获取队列名称的哈希并将其分派到相应的内部执行程序,使您的 "SequentialPooledExecutor" 分派到其内部单线程执行程序服务之一。

在步骤 3 中发生的散列部分允许您让每个 "queue name" 的任务始终转到 "SequentialPooledExecutor".[=15 内的相同(单线程)执行程序服务=]

另一种可能的方法是使用 CompletionStageCompletableFutures. These are, in effect, listenable futures (that have a completion handler). With these, the first time you have a "session", you create a CompletableFuture with your first task, and hold on to it. At each new task, you combine the previous future with the new task, calling thenAcceptAsync(或任何类似的方法)。你得到的是一个线性的执行任务链。

private Map<Integer, CompletableFuture<Void>> sessionTasks = new HashMap<>();
private ExecutorService pool = Executors.newFixedThreadPool(200);

public void submit(int sessionId, Runnable task) {  
    if (sessionTasks.containsKey(sessionId)) {
        sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool));
    } else {
        sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool));



ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

对于您的用例,将 ThreadPoolExecutor 用作

ThreadPoolExecutor executor =    
ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000));

以上队列的代码上限为ThreadPoolExecutor 1000。如果要使用自定义拒绝执行处理程序,可以配置RejectedExeutionHandler

相关 SE 问题:

最近遇到了同样的问题。 没有内置的 class ,但是队列足够近了。 我的简单实现如下所示(也许对其他人在同一问题上寻找示例有帮助)

public class SerializedAsyncRunnerSimple implements Runnable {
private final ExecutorService pool;
protected final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); //thread safe queue
protected final AtomicBoolean active = new AtomicBoolean(false);

public SerializedAsyncRunnerSimple(ExecutorService threadPool) {this.pool = threadPool;}

public void addWork(Runnable r){        

private void startExecutionIfInactive() {
    if(active.compareAndSet(false, true)) {

public synchronized void run() {
    active.set(false); //all future adds will not be executed on this thread anymore
    if(!queue.isEmpty()) { //if some items were added to the queue after the last queue.poll
        startExecutionIfInactive();// trigger an execution on next thread