执行者服务和速率限制器

Executor Service and Rate Limiter

我有一个 class 调用第 3 方 API。现在我正在努力实施,一次只能向第 3 方发送 10 个请求 api。

我想到了使用具有固定线程池的执行程序服务,如下所示。

public class SimpleRateLimiter {

    private ExecutorService executorService = Executors.newFixedThreadPool(10);
    private static SimpleRateLimiter srl = new SimpleRateLimiter();

    private SimpleRateLimiter() {}

    public static SimpleRateLimiter getInstance() {
        return srl;
    }

    public void doSomething() {
        executorService.submit(new Runnable() {         
            @Override
            public void run() {

                // call to 3rd party api
            }
        });
    }

    public void terminate() throws Exception {
        executorService.shutdown();
    }

}

我的理解是,因为我在池中只有 10 个工作线程,所以在任何时候都只能使用上面的代码向第 3 方 API 发出 10 个请求.但这并不意味着 Executor 服务将拒绝所有其他请求。相反,它会接受所有请求,并在完成正在处理的任务后将请求分配给工作线程。

Is the above understanding correct?

我想使用 [guava] (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) 库来完成您的工作会更容易。

是的,你的理解是正确的,如果你看一下 Executors.newFixedThreadPool() 的实现,它 returns 是 ThreadPoolExecutor 的一个实例,带有 unbounded BlockingQueue 实施:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

LinkedBlockingQueue 是一个可选的有界 BlockingQueue 实现,您可以忽略它的构造函数的 capacity 参数,如果您不提供任何东西,队列的最大大小是 Integer.MAX_VALUE:

public LinkedBlockingQueue() {
       this(Integer.MAX_VALUE);
}

所以关于你的问题所有任务都将提交给线程池并且一次只有 10 个线程 运行 并调用 API,剩下的要排队。

相比之下,如果您使用自定义 ThreadPoolExecutor 和有界 BlockingQueue 实现,例如 ArrayBlockingQueue(具有预定义的容量)而不是 LinkedBlockingQueue 在这种情况下,如果所有线程都忙,队列已满,您尝试提交另一个任务,该任务将被 rejected.

在您的代码中,executorService.submit 将继续接受新任务(直到 Integer.MAX_VALUE 个任务),但在给定时刻只有 10 个线程 运行。