具有每个请求配额的线程池执行器?

Thread pool executor with per-request quotas?

我有一个 Web 应用程序,其中一个请求可能会调用多个作业,这些作业必须 运行 并行。作业涉及网络通信,因此它们不完全是 CPU 绑定;此类工作的估计持续时间在几秒到几分钟之间。

显而易见的解决方案是将作业执行委托给专用线程池。但是,我想避免这样一种情况,即一个客户(即一个请求)占用了大部分池来完成自己的工作,而没有给其他客户留下任何东西。同时,创建工作的请求不应阻塞;它应该立即提交其作业以供执行 return。本质上,我正在寻找的是根据一些自定义标准设置配额的能力,例如,不超过 20 个匹配的工作将同时 运行ning,即使已经提交了更多线程池。可能有什么开箱即用的东西吗?

另一种方法当然是为每个请求创建一个新的固定线程池实例,将作业提交给它然后关闭,但感觉这不是正确的方法。

我想不出开箱即用的东西。但是,前段时间我实施了一个解决类似问题的解决方案,可以对其进行调整。

举个例子,假设您想要一个总共有 200 个可用线程的线程池,但您只想允许 10 个线程使用单个标识符(在您的情况下为:request-id)。

您可以做的是创建 20 个独立的线程池,每个线程池有 10 个线程。然后有一些简单的逻辑将任务转发到这些线程池之一。它可以完成,即基于 hash-of-task-id%number-of-threadpools.

这将保证来自一个请求(或至少具有相同请求 ID)的所有任务都将转到一个特定的执行者。

当然这里有一些假设(即你的 id 的散列必须分布良好,这样当你得到它的模时,它会在线程池中均匀分布 id)和风险(即来自多个请求的任务可能将去一个池,而其他池可能是空的)。

如果您需要更通用的解决方案,恐怕需要更多的实现。

这是上述解决方案的代码:

public class IdAffiliatiedThreadPool {

    private final int executorsCnt;
    private final ExecutorService[] executors;
    
    public IdAffiliatiedThreadPool(String name, int executorsCnt, int threadsPerExecutor) {
        this.executorsCnt = executorsCnt;
        this.executors = new ExecutorService[executorsCnt];
        
        for(int i = 0; i < executorsCnt; i++) {
            this.executors[i] = Executors.newFixedThreadPool(threadsPerExecutor);
        }
    }
    
    public void execute(Object id, Runnable action) {
        int threadId = id.hashCode() % executorsCnt;
        this.executors[threadId].execute(action);
    }
}