按顺序执行任务但从池中获取线程的 ExecutorService

ExecutorService that executes tasks sequentially but takes threads from a pool

我正在尝试构建 ExecutorService 的实现,我们称之为 SequentialPooledExecutor,具有以下属性。

  1. SequentialPooledExecutor的所有实例共享同一个线程池

  2. SequentialPooledExecutor 的同一实例的调用按顺序执行。

换句话说,实例在开始处理其队列中的下一个任务之前等待当前正在执行的任务终止。

我目前正在自己​​实施 SequentialPooledExecutor,但我想知道我是否在重新发明轮子。我查看了 ExecutorService 的不同实现,例如 Executors class 提供的实现,但没有找到满足我要求的实现。

你知道我是否缺少现有的实现,或者我应该继续自己实现接口吗?

编辑:

我觉得我的需求不是很清楚,我看看能不能换句话解释一下。

假设我有一系列会话,假设有 1000 个(我之前调用执行程序实例的东西)。我可以将任务提交到会话,并且我希望保证提交到同一会话的所有任务都按顺序执行。但是,属于不同会话的任务不应相互依赖。

我想定义一个 ExecutorService 来执行这些任务,但使用有限数量的线程,比方说 200,但要确保任务不会在同一会话中的前一个任务完成之前启动.

我不知道是否有任何现有的东西已经这样做了,或者我是否应该自己实施这样的 ExecutorService

如果您想按顺序执行任务,只需创建一个 ExecutorService with only one thread thanks to Executors.newSingleThreadExecutor().

如果您有不同类型的任务并且只想按顺序执行相同类型的任务,您可以使用相同的 单线程 ExecutorService 相同类型的任务,无需重新发明轮子。

假设您有 1 000 不同类型的任务,您可以使用 200 单线程 ExecutorService,您唯一需要自己实现的是对于给定类型的任务,您始终需要使用相同的单线程 ExecutorService

如果您有数千个必须按顺序处理的密钥,但您没有数千个核心,您可以使用散列策略来分配这样的工作

ExecutorService[] es = // many single threaded executors

public <T> Future<T> submit(String key, Callable<T> calls) {
    int h = Math.abs(key.hashCode() % es.length);
    return es[h].submit(calls);
}

一般来说,你只需要 2 * N 个线程来保持 N 个核心忙碌,如果你的任务是 CPU 绑定的,那么更多只会增加开销。

@Nicolas 的回答可能是您最好的选择,因为它简单、经过充分测试且高效。

如果不能满足你的要求,我会这样做:

  1. 不要将 "SequentialPooledExecutor" 设为执行器服务,将其设为 "pool" 单线程执行器服务的外观
  2. 让你的 "SequentialPooledExecutor" 实现一个提交方法(接受一个 Runnable / Callable,和一个代表 "queue name" 的字符串),returns 一个 Future,就像一个执行服务
  3. 调用此方法时,通过获取队列名称的哈希并将其分派到相应的内部执行程序,使您的 "SequentialPooledExecutor" 分派到其内部单线程执行程序服务之一。

在步骤 3 中发生的散列部分允许您让每个 "queue name" 的任务始终转到 "SequentialPooledExecutor".[=15 内的相同(单线程)执行程序服务=]

另一种可能的方法是使用 CompletionStageCompletableFutures. These are, in effect, listenable futures (that have a completion handler). With these, the first time you have a "session", you create a CompletableFuture with your first task, and hold on to it. At each new task, you combine the previous future with the new task, calling thenAcceptAsync(或任何类似的方法)。你得到的是一个线性的执行任务链。

private Map<Integer, CompletableFuture<Void>> sessionTasks = new HashMap<>();
private ExecutorService pool = Executors.newFixedThreadPool(200);

public void submit(int sessionId, Runnable task) {  
    if (sessionTasks.containsKey(sessionId)) {
        sessionTasks.compute(sessionId, (i, c) -> c.thenRunAsync(task, pool));
    } else {
        sessionTasks.put(sessionId, CompletableFuture.runAsync(task, pool));
    }
}

如果会话没有任务,则会在提供的池中创建一个新任务并运行。如果在添加新任务时会话已经有任务,则后者将链接(thenRun)到前一个任务,以确保顺序。

如果要配置有界队列,使用ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)

对于您的用例,将 ThreadPoolExecutor 用作

ThreadPoolExecutor executor =    
ThreadPoolExecutor(1,1,60,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(1000));

以上队列的代码上限为ThreadPoolExecutor 1000。如果要使用自定义拒绝执行处理程序,可以配置RejectedExeutionHandler

相关 SE 问题:

最近遇到了同样的问题。 没有内置的 class ,但是队列足够近了。 我的简单实现如下所示(也许对其他人在同一问题上寻找示例有帮助)

public class SerializedAsyncRunnerSimple implements Runnable {
private final ExecutorService pool;
protected final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); //thread safe queue
protected final AtomicBoolean active = new AtomicBoolean(false);


public SerializedAsyncRunnerSimple(ExecutorService threadPool) {this.pool = threadPool;}


public void addWork(Runnable r){        
    queue.add(r);
    startExecutionIfInactive();
}

private void startExecutionIfInactive() {
    if(active.compareAndSet(false, true)) {
        pool.execute(this);
    }
}

@Override
public synchronized void run() {
    while(!queue.isEmpty()){
        queue.poll().run();
    }
    active.set(false); //all future adds will not be executed on this thread anymore
    if(!queue.isEmpty()) { //if some items were added to the queue after the last queue.poll
        startExecutionIfInactive();// trigger an execution on next thread
    }
}