在 ScheduledThreadPoolExecutor 中使用带有比较器的 PriorityBlockingQueue

Use PriorityBlockingQueue with Comparator in ScheduledThreadPoolExecutor

首先:我已经阅读了以下两个问题及其可能的解决方案:

我遇到的困境是我想使用自定义 BlockingQueue 或者更确切地说是不同但特定的队列,即 PriorityBlockingQueue 和自定义 Comparator 对队列进行排序按优先级。

ThreadPoolExecutor 确实在其构造函数中支持自定义队列,但它不实现 ScheduledExecutorService 接口中的方法。所以我去找了 subclass ScheduledThreadPoolExecutor,但它不支持自定义队列,而是使用 DelayedWorkQueue

问题:

我当前的实现如下所示:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.croemheld.tasks.PriorityTaskComparator;

public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {

    private static final int INITIAL_QUEUE_SIZE = 10;

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
    }

    public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
            new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
    }

    @Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        // TODO Auto-generated method stub
        return null;
    }

}

如您所见,构造函数问题已解决,但仍然存在 ScheduledExecutorService.

中的调度方法的实现

所以我问你,有没有什么方法可以将 Comparator 传递给队列,或者传递一个 简单的 而不是太详尽的方法来创建自己的队列执行器 class 实现了 ScheduledExecutorService 的方法并提供了 ThreadPoolExecutor class 的方法并使用了 PriorityBlockingQueue?

如果我理解你的问题,你想定期执行一些任务,但要根据一些自定义优先级。除了发明您自己的 ExecutorService,我建议退后一步并查看您的设计。您可能希望将调度与优先级排序和任务执行分开:

  1. 由于 ThreadPoolExecutor 接受自定义 BlockingQueue,您可以轻松实现自己的优先级排序。然后只需定期从代码中的其他地方提交任务。
  2. 如果您坚持使用 ScheduledThreadPoolExecutor,那么您可以安排时间,但您必须自己实施优先级排序。您可以通过它变得非常有创意,但一种选择可能是让编排任务从自定义 BlockingQueue 中挑选任务并提交到池中。

我寻找其他可能的解决方案并得出以下结果:

由于一个ThreadPoolExecutor管理着多个Threads的池(即,如果你在Executors.newFixedThreadPool(int nThreads)方法中设置了两个或多个线程),如果你真的想混合一个基于优先级 BlockingQueue ,那么我建议执行以下操作:

  • 创建一个自己的 ThreadPoolExecutor class 与上面的类似,使用带有自定义比较器的 PriorityBlockingQueue
  • 创建您自己的 Task class(或 FutureTask 扩展,您认为最适合您的即可)
  • 这些任务 classes 用于一次性任务,这意味着它们只 运行 一次。

对于应该 运行 在后台定期执行的循环任务,我为此想出了一个简单的 class:

public abstract class AbstractThread extends Thread {

    protected Runnable runnable;

    protected AbstractThread(String name, Runnable runnable) {
        super(runnable, name);

        this.runnable = runnable;
    }

    /**
     * This method provides a way to perform some action before the thread is actually starting.
     */
    protected abstract void beforeExecution();

    /**
     * This method provides a way to perform some action after the thread finished.
     */
    protected abstract void afterExecution();

    @Override
    public void run() {
        try {
            doRun();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * Run the given runnable here.
     * 
     * @throws InterruptedException 
     */
    protected abstract void doRun() throws InterruptedException;

}

虽然简单的一次性线程只是 运行 运行 启用一次:

@Override
protected void doRun() {
    beforeExecution();

    runnable.run();

    afterExecution();
}

线程中的周期性任务只会做类似的事情:

@Override
protected void doRun() throws InterruptedException {
    beforeExecution();

    while(!isInterrupted()) {
        runnable.run();
        Thread.sleep(millis);
    }

    afterExecution();
}

如果你想支持 运行 偶尔执行一次的周期性任务,你可以将延迟参数传递给 Thread 实例,或者你只需​​在 Thread.sleep(delay) 中编写类似的内容你的 运行nable.

这不是实际代码,只是一个建议,因为我现在正尝试使用它。

我使用 PriorityBlockingQueue 为 ThreadPoolExecutor 编写了一个简单、干净、有效的解决方案。

public class PriorityQueueThreadPoolExecutor {
    private static final int DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY = 100;
    private static final long THREAD_TIMEOUT_IN_SECS = 60L;
    public static final int DEFAULT_PRIORITY = 0;

    private static final AtomicInteger InstanceCounter = new AtomicInteger(0);

    private final ThreadPoolExecutor internalExecutor;

    public PriorityQueueThreadPoolExecutor(int threadPoolSize, String threadNamePrefix) {
        internalExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, THREAD_TIMEOUT_IN_SECS,
                TimeUnit.SECONDS, createPriorityQueue(), createThreadFactory(threadNamePrefix));
        internalExecutor.allowCoreThreadTimeOut(true);
    }

    public void submit(Runnable runnable, int priority) {
        internalExecutor.execute(new RunnableWithPriority(runnable, priority));
    }

    public void submit(Runnable runnable) {
        submit(runnable, DEFAULT_PRIORITY);
    }

    public ThreadPoolExecutor getInternalThreadPoolExecutor() {
        return internalExecutor;
    }

    private static BlockingQueue<Runnable> createPriorityQueue() {
        return new PriorityBlockingQueue<>(DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY,
                new ComparatorForPriorityRunnable());
    }

    private static ThreadFactory createThreadFactory(String threadNamePrefix) {
        return new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory())
                .setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
    }

    private static class RunnableWithPriority implements Runnable {
        final int creationOrder;
        final int priority;
        final Runnable runnable;

        public RunnableWithPriority(Runnable runnable, int priority) {
            this.runnable = runnable;
            this.priority = priority;
            this.creationOrder = InstanceCounter.incrementAndGet();
        }

        @Override
        public void run() {
            runnable.run();
        }
    }

    private static class ComparatorForPriorityRunnable implements Comparator<Runnable> {
        @Override
        public int compare(Runnable r1, Runnable r2) {
            RunnableWithPriority pr1 = (RunnableWithPriority) r1;
            RunnableWithPriority pr2 = (RunnableWithPriority) r2;
            // higher value means higher priority
            int priorityResult = pr2.priority - pr1.priority;
            return priorityResult != 0 ? priorityResult : (pr1.creationOrder - pr2.creationOrder);
        }
    }
}