执行器线程池 - 限制队列大小和最旧的出队

Executor Thread Pool - limit queue size and dequeue oldest

我在 spring 引导应用程序中为生成消息的消费者使用固定线程池。我的生产者生产(很多)快于生产者能够处理消息的速度,因此线程池的队列似乎是 "flooding"。

限制队列大小的最佳方法是什么?预期的队列行为将是 "if the queue is full, remove the head and insert the new Runnable"。是否可以这样配置Executors线程池?

ThreadPoolExecutor 通过 ThreadPoolExecutor.DiscardOldestPolicy:

支持此功能

A handler for rejected tasks that discards the oldest unhandled request and then retries execute, unless the executor is shut down, in which case the task is discarded.

您需要使用此策略手动构建池,例如:

int poolSize = ...;
int queueSize = ...;
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();

ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(queueSize),
    handler);

这将为您创建一个您传递的大小的线程池。

ExecutorService service = Executors.newFixedThreadPool(THREAD_SIZE);

这在内部创建了一个 ThreadPoolExecutor 实例,它实现了 ExecutorService。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

要创建自定义广告池,您可以这样做。

ExecutorService service =   new ThreadPoolExecutor(5, 5,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(10));

这里我们可以指定队列的大小,使用LinkedBlockingQueue的重载构造函数。

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

希望这对您有所帮助。干杯!!!

例如,如果您使用一次能够连接 100 个连接的数据库 (psql)。任务可能需要 2000 毫秒 ...

        int THREADS = 50;
        ExecutorService exe = new ThreadPoolExecutor(THREADS,
                                                    50,
                                                    0L,
                                                     TimeUnit.MILLISECONDS,
                                                     new ArrayBlockingQueue<>(10),
                                                     new ThreadPoolExecutor.CallerRunsPolicy()); ```