执行者服务和速率限制器
Executor Service and Rate Limiter
我有一个 class 调用第 3 方 API。现在我正在努力实施,一次只能向第 3 方发送 10 个请求 api。
我想到了使用具有固定线程池的执行程序服务,如下所示。
public class SimpleRateLimiter {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private static SimpleRateLimiter srl = new SimpleRateLimiter();
private SimpleRateLimiter() {}
public static SimpleRateLimiter getInstance() {
return srl;
}
public void doSomething() {
executorService.submit(new Runnable() {
@Override
public void run() {
// call to 3rd party api
}
});
}
public void terminate() throws Exception {
executorService.shutdown();
}
}
我的理解是,因为我在池中只有 10 个工作线程,所以在任何时候都只能使用上面的代码向第 3 方 API 发出 10 个请求.但这并不意味着 Executor 服务将拒绝所有其他请求。相反,它会接受所有请求,并在完成正在处理的任务后将请求分配给工作线程。
Is the above understanding correct?
我想使用 [guava] (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) 库来完成您的工作会更容易。
是的,你的理解是正确的,如果你看一下 Executors.newFixedThreadPool()
的实现,它 returns 是 ThreadPoolExecutor
的一个实例,带有 unbounded BlockingQueue
实施:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
LinkedBlockingQueue
是一个可选的有界 BlockingQueue
实现,您可以忽略它的构造函数的 capacity 参数,如果您不提供任何东西,队列的最大大小是 Integer.MAX_VALUE
:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
所以关于你的问题所有任务都将提交给线程池并且一次只有 10 个线程 运行 并调用 API,剩下的要排队。
相比之下,如果您使用自定义 ThreadPoolExecutor
和有界 BlockingQueue
实现,例如 ArrayBlockingQueue
(具有预定义的容量)而不是 LinkedBlockingQueue
在这种情况下,如果所有线程都忙,队列已满,您尝试提交另一个任务,该任务将被 rejected.
在您的代码中,executorService.submit
将继续接受新任务(直到 Integer.MAX_VALUE
个任务),但在给定时刻只有 10 个线程 运行。
我有一个 class 调用第 3 方 API。现在我正在努力实施,一次只能向第 3 方发送 10 个请求 api。
我想到了使用具有固定线程池的执行程序服务,如下所示。
public class SimpleRateLimiter {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private static SimpleRateLimiter srl = new SimpleRateLimiter();
private SimpleRateLimiter() {}
public static SimpleRateLimiter getInstance() {
return srl;
}
public void doSomething() {
executorService.submit(new Runnable() {
@Override
public void run() {
// call to 3rd party api
}
});
}
public void terminate() throws Exception {
executorService.shutdown();
}
}
我的理解是,因为我在池中只有 10 个工作线程,所以在任何时候都只能使用上面的代码向第 3 方 API 发出 10 个请求.但这并不意味着 Executor 服务将拒绝所有其他请求。相反,它会接受所有请求,并在完成正在处理的任务后将请求分配给工作线程。
Is the above understanding correct?
我想使用 [guava] (https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) 库来完成您的工作会更容易。
是的,你的理解是正确的,如果你看一下 Executors.newFixedThreadPool()
的实现,它 returns 是 ThreadPoolExecutor
的一个实例,带有 unbounded BlockingQueue
实施:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
LinkedBlockingQueue
是一个可选的有界 BlockingQueue
实现,您可以忽略它的构造函数的 capacity 参数,如果您不提供任何东西,队列的最大大小是 Integer.MAX_VALUE
:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
所以关于你的问题所有任务都将提交给线程池并且一次只有 10 个线程 运行 并调用 API,剩下的要排队。
相比之下,如果您使用自定义 ThreadPoolExecutor
和有界 BlockingQueue
实现,例如 ArrayBlockingQueue
(具有预定义的容量)而不是 LinkedBlockingQueue
在这种情况下,如果所有线程都忙,队列已满,您尝试提交另一个任务,该任务将被 rejected.
在您的代码中,executorService.submit
将继续接受新任务(直到 Integer.MAX_VALUE
个任务),但在给定时刻只有 10 个线程 运行。