执行器线程池 - 限制队列大小和最旧的出队
Executor Thread Pool - limit queue size and dequeue oldest
我在 spring 引导应用程序中为生成消息的消费者使用固定线程池。我的生产者生产(很多)快于生产者能够处理消息的速度,因此线程池的队列似乎是 "flooding"。
限制队列大小的最佳方法是什么?预期的队列行为将是 "if the queue is full, remove the head and insert the new Runnable"。是否可以这样配置Executors线程池?
ThreadPoolExecutor
通过 ThreadPoolExecutor.DiscardOldestPolicy
:
支持此功能
A handler for rejected tasks that discards the oldest unhandled
request and then retries execute, unless the executor is shut down, in
which case the task is discarded.
您需要使用此策略手动构建池,例如:
int poolSize = ...;
int queueSize = ...;
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),
handler);
这将为您创建一个您传递的大小的线程池。
ExecutorService service = Executors.newFixedThreadPool(THREAD_SIZE);
这在内部创建了一个 ThreadPoolExecutor 实例,它实现了 ExecutorService。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
要创建自定义广告池,您可以这样做。
ExecutorService service = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10));
这里我们可以指定队列的大小,使用LinkedBlockingQueue的重载构造函数。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
希望这对您有所帮助。干杯!!!
例如,如果您使用一次能够连接 100 个连接的数据库 (psql)。任务可能需要 2000 毫秒 ...
int THREADS = 50;
ExecutorService exe = new ThreadPoolExecutor(THREADS,
50,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy()); ```
我在 spring 引导应用程序中为生成消息的消费者使用固定线程池。我的生产者生产(很多)快于生产者能够处理消息的速度,因此线程池的队列似乎是 "flooding"。
限制队列大小的最佳方法是什么?预期的队列行为将是 "if the queue is full, remove the head and insert the new Runnable"。是否可以这样配置Executors线程池?
ThreadPoolExecutor
通过 ThreadPoolExecutor.DiscardOldestPolicy
:
A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.
您需要使用此策略手动构建池,例如:
int poolSize = ...;
int queueSize = ...;
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),
handler);
这将为您创建一个您传递的大小的线程池。
ExecutorService service = Executors.newFixedThreadPool(THREAD_SIZE);
这在内部创建了一个 ThreadPoolExecutor 实例,它实现了 ExecutorService。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
要创建自定义广告池,您可以这样做。
ExecutorService service = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10));
这里我们可以指定队列的大小,使用LinkedBlockingQueue的重载构造函数。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
希望这对您有所帮助。干杯!!!
例如,如果您使用一次能够连接 100 个连接的数据库 (psql)。任务可能需要 2000 毫秒 ...
int THREADS = 50;
ExecutorService exe = new ThreadPoolExecutor(THREADS,
50,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.CallerRunsPolicy()); ```