在 ScheduledThreadPoolExecutor 中使用带有比较器的 PriorityBlockingQueue
Use PriorityBlockingQueue with Comparator in ScheduledThreadPoolExecutor
首先:我已经阅读了以下两个问题及其可能的解决方案:
- ScheduledThreadPoolExecutors and custom queue
- Java Executors: how can I set task priority?
我遇到的困境是我想使用自定义 BlockingQueue
或者更确切地说是不同但特定的队列,即 PriorityBlockingQueue
和自定义 Comparator
对队列进行排序按优先级。
ThreadPoolExecutor
确实在其构造函数中支持自定义队列,但它不实现 ScheduledExecutorService
接口中的方法。所以我去找了 subclass ScheduledThreadPoolExecutor
,但它不支持自定义队列,而是使用 DelayedWorkQueue
。
问题:
- 我不能从
ScheduledThreadPoolExecutor
扩展,因为为我自己的 class 创建构造函数不会做任何事情,因为 ScheduledThreadPoolExecutor
的构造函数不接受自定义队列作为参数。
- 我无法复制
ThreadPoolExecutor
class 的内容和 ScheduledThreadPoolExecutor
的实现,因为它使用了很多用 no 修饰符声明的方法(例如 canRunInCurrentState(boolean periodic)
和此调用调用的所有方法)不允许我访问该方法,因为即使它是 ThreadPoolExecutor
的子 class,它是不在同一个包中。
我当前的实现如下所示:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.croemheld.tasks.PriorityTaskComparator;
public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
private static final int INITIAL_QUEUE_SIZE = 10;
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
}
如您所见,构造函数问题已解决,但仍然存在 ScheduledExecutorService
.
中的调度方法的实现
所以我问你,有没有什么方法可以将 Comparator
传递给队列,或者传递一个 简单的 而不是太详尽的方法来创建自己的队列执行器 class 实现了 ScheduledExecutorService
的方法并提供了 ThreadPoolExecutor
class 的方法并使用了 PriorityBlockingQueue
?
如果我理解你的问题,你想定期执行一些任务,但要根据一些自定义优先级。除了发明您自己的 ExecutorService
,我建议退后一步并查看您的设计。您可能希望将调度与优先级排序和任务执行分开:
- 由于
ThreadPoolExecutor
接受自定义 BlockingQueue
,您可以轻松实现自己的优先级排序。然后只需定期从代码中的其他地方提交任务。
- 如果您坚持使用
ScheduledThreadPoolExecutor
,那么您可以安排时间,但您必须自己实施优先级排序。您可以通过它变得非常有创意,但一种选择可能是让编排任务从自定义 BlockingQueue
中挑选任务并提交到池中。
我寻找其他可能的解决方案并得出以下结果:
由于一个ThreadPoolExecutor
管理着多个Threads
的池(即,如果你在Executors.newFixedThreadPool(int nThreads)
方法中设置了两个或多个线程),如果你真的想混合一个基于优先级 BlockingQueue
,那么我建议执行以下操作:
- 创建一个自己的
ThreadPoolExecutor
class 与上面的类似,使用带有自定义比较器的 PriorityBlockingQueue
。
- 创建您自己的
Task
class(或 FutureTask
扩展,您认为最适合您的即可)
- 这些任务 classes 用于一次性任务,这意味着它们只 运行 一次。
对于应该 运行 在后台定期执行的循环任务,我为此想出了一个简单的 class:
public abstract class AbstractThread extends Thread {
protected Runnable runnable;
protected AbstractThread(String name, Runnable runnable) {
super(runnable, name);
this.runnable = runnable;
}
/**
* This method provides a way to perform some action before the thread is actually starting.
*/
protected abstract void beforeExecution();
/**
* This method provides a way to perform some action after the thread finished.
*/
protected abstract void afterExecution();
@Override
public void run() {
try {
doRun();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Run the given runnable here.
*
* @throws InterruptedException
*/
protected abstract void doRun() throws InterruptedException;
}
虽然简单的一次性线程只是 运行 运行 启用一次:
@Override
protected void doRun() {
beforeExecution();
runnable.run();
afterExecution();
}
线程中的周期性任务只会做类似的事情:
@Override
protected void doRun() throws InterruptedException {
beforeExecution();
while(!isInterrupted()) {
runnable.run();
Thread.sleep(millis);
}
afterExecution();
}
如果你想支持 运行 偶尔执行一次的周期性任务,你可以将延迟参数传递给 Thread
实例,或者你只需在 Thread.sleep(delay)
中编写类似的内容你的 运行nable.
这不是实际代码,只是一个建议,因为我现在正尝试使用它。
我使用 PriorityBlockingQueue 为 ThreadPoolExecutor 编写了一个简单、干净、有效的解决方案。
public class PriorityQueueThreadPoolExecutor {
private static final int DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY = 100;
private static final long THREAD_TIMEOUT_IN_SECS = 60L;
public static final int DEFAULT_PRIORITY = 0;
private static final AtomicInteger InstanceCounter = new AtomicInteger(0);
private final ThreadPoolExecutor internalExecutor;
public PriorityQueueThreadPoolExecutor(int threadPoolSize, String threadNamePrefix) {
internalExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, THREAD_TIMEOUT_IN_SECS,
TimeUnit.SECONDS, createPriorityQueue(), createThreadFactory(threadNamePrefix));
internalExecutor.allowCoreThreadTimeOut(true);
}
public void submit(Runnable runnable, int priority) {
internalExecutor.execute(new RunnableWithPriority(runnable, priority));
}
public void submit(Runnable runnable) {
submit(runnable, DEFAULT_PRIORITY);
}
public ThreadPoolExecutor getInternalThreadPoolExecutor() {
return internalExecutor;
}
private static BlockingQueue<Runnable> createPriorityQueue() {
return new PriorityBlockingQueue<>(DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY,
new ComparatorForPriorityRunnable());
}
private static ThreadFactory createThreadFactory(String threadNamePrefix) {
return new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory())
.setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
}
private static class RunnableWithPriority implements Runnable {
final int creationOrder;
final int priority;
final Runnable runnable;
public RunnableWithPriority(Runnable runnable, int priority) {
this.runnable = runnable;
this.priority = priority;
this.creationOrder = InstanceCounter.incrementAndGet();
}
@Override
public void run() {
runnable.run();
}
}
private static class ComparatorForPriorityRunnable implements Comparator<Runnable> {
@Override
public int compare(Runnable r1, Runnable r2) {
RunnableWithPriority pr1 = (RunnableWithPriority) r1;
RunnableWithPriority pr2 = (RunnableWithPriority) r2;
// higher value means higher priority
int priorityResult = pr2.priority - pr1.priority;
return priorityResult != 0 ? priorityResult : (pr1.creationOrder - pr2.creationOrder);
}
}
}
首先:我已经阅读了以下两个问题及其可能的解决方案:
- ScheduledThreadPoolExecutors and custom queue
- Java Executors: how can I set task priority?
我遇到的困境是我想使用自定义 BlockingQueue
或者更确切地说是不同但特定的队列,即 PriorityBlockingQueue
和自定义 Comparator
对队列进行排序按优先级。
ThreadPoolExecutor
确实在其构造函数中支持自定义队列,但它不实现 ScheduledExecutorService
接口中的方法。所以我去找了 subclass ScheduledThreadPoolExecutor
,但它不支持自定义队列,而是使用 DelayedWorkQueue
。
问题:
- 我不能从
ScheduledThreadPoolExecutor
扩展,因为为我自己的 class 创建构造函数不会做任何事情,因为ScheduledThreadPoolExecutor
的构造函数不接受自定义队列作为参数。 - 我无法复制
ThreadPoolExecutor
class 的内容和ScheduledThreadPoolExecutor
的实现,因为它使用了很多用 no 修饰符声明的方法(例如canRunInCurrentState(boolean periodic)
和此调用调用的所有方法)不允许我访问该方法,因为即使它是ThreadPoolExecutor
的子 class,它是不在同一个包中。
我当前的实现如下所示:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.croemheld.tasks.PriorityTaskComparator;
public class ScheduledPriorityThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
private static final int INITIAL_QUEUE_SIZE = 10;
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()));
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), handler);
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory);
}
public ScheduledPriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit,
new PriorityBlockingQueue<Runnable>(INITIAL_QUEUE_SIZE, new PriorityTaskComparator()), threadFactory, handler);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
// TODO Auto-generated method stub
return null;
}
}
如您所见,构造函数问题已解决,但仍然存在 ScheduledExecutorService
.
所以我问你,有没有什么方法可以将 Comparator
传递给队列,或者传递一个 简单的 而不是太详尽的方法来创建自己的队列执行器 class 实现了 ScheduledExecutorService
的方法并提供了 ThreadPoolExecutor
class 的方法并使用了 PriorityBlockingQueue
?
如果我理解你的问题,你想定期执行一些任务,但要根据一些自定义优先级。除了发明您自己的 ExecutorService
,我建议退后一步并查看您的设计。您可能希望将调度与优先级排序和任务执行分开:
- 由于
ThreadPoolExecutor
接受自定义BlockingQueue
,您可以轻松实现自己的优先级排序。然后只需定期从代码中的其他地方提交任务。 - 如果您坚持使用
ScheduledThreadPoolExecutor
,那么您可以安排时间,但您必须自己实施优先级排序。您可以通过它变得非常有创意,但一种选择可能是让编排任务从自定义BlockingQueue
中挑选任务并提交到池中。
我寻找其他可能的解决方案并得出以下结果:
由于一个ThreadPoolExecutor
管理着多个Threads
的池(即,如果你在Executors.newFixedThreadPool(int nThreads)
方法中设置了两个或多个线程),如果你真的想混合一个基于优先级 BlockingQueue
,那么我建议执行以下操作:
- 创建一个自己的
ThreadPoolExecutor
class 与上面的类似,使用带有自定义比较器的PriorityBlockingQueue
。 - 创建您自己的
Task
class(或FutureTask
扩展,您认为最适合您的即可) - 这些任务 classes 用于一次性任务,这意味着它们只 运行 一次。
对于应该 运行 在后台定期执行的循环任务,我为此想出了一个简单的 class:
public abstract class AbstractThread extends Thread {
protected Runnable runnable;
protected AbstractThread(String name, Runnable runnable) {
super(runnable, name);
this.runnable = runnable;
}
/**
* This method provides a way to perform some action before the thread is actually starting.
*/
protected abstract void beforeExecution();
/**
* This method provides a way to perform some action after the thread finished.
*/
protected abstract void afterExecution();
@Override
public void run() {
try {
doRun();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Run the given runnable here.
*
* @throws InterruptedException
*/
protected abstract void doRun() throws InterruptedException;
}
虽然简单的一次性线程只是 运行 运行 启用一次:
@Override
protected void doRun() {
beforeExecution();
runnable.run();
afterExecution();
}
线程中的周期性任务只会做类似的事情:
@Override
protected void doRun() throws InterruptedException {
beforeExecution();
while(!isInterrupted()) {
runnable.run();
Thread.sleep(millis);
}
afterExecution();
}
如果你想支持 运行 偶尔执行一次的周期性任务,你可以将延迟参数传递给 Thread
实例,或者你只需在 Thread.sleep(delay)
中编写类似的内容你的 运行nable.
这不是实际代码,只是一个建议,因为我现在正尝试使用它。
我使用 PriorityBlockingQueue 为 ThreadPoolExecutor 编写了一个简单、干净、有效的解决方案。
public class PriorityQueueThreadPoolExecutor {
private static final int DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY = 100;
private static final long THREAD_TIMEOUT_IN_SECS = 60L;
public static final int DEFAULT_PRIORITY = 0;
private static final AtomicInteger InstanceCounter = new AtomicInteger(0);
private final ThreadPoolExecutor internalExecutor;
public PriorityQueueThreadPoolExecutor(int threadPoolSize, String threadNamePrefix) {
internalExecutor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, THREAD_TIMEOUT_IN_SECS,
TimeUnit.SECONDS, createPriorityQueue(), createThreadFactory(threadNamePrefix));
internalExecutor.allowCoreThreadTimeOut(true);
}
public void submit(Runnable runnable, int priority) {
internalExecutor.execute(new RunnableWithPriority(runnable, priority));
}
public void submit(Runnable runnable) {
submit(runnable, DEFAULT_PRIORITY);
}
public ThreadPoolExecutor getInternalThreadPoolExecutor() {
return internalExecutor;
}
private static BlockingQueue<Runnable> createPriorityQueue() {
return new PriorityBlockingQueue<>(DEFAULT_INITIAL_PRIORITY_QUEUE_CAPACITY,
new ComparatorForPriorityRunnable());
}
private static ThreadFactory createThreadFactory(String threadNamePrefix) {
return new ThreadFactoryBuilder().setThreadFactory(Executors.defaultThreadFactory())
.setNameFormat(threadNamePrefix + "-%d").setDaemon(true).build();
}
private static class RunnableWithPriority implements Runnable {
final int creationOrder;
final int priority;
final Runnable runnable;
public RunnableWithPriority(Runnable runnable, int priority) {
this.runnable = runnable;
this.priority = priority;
this.creationOrder = InstanceCounter.incrementAndGet();
}
@Override
public void run() {
runnable.run();
}
}
private static class ComparatorForPriorityRunnable implements Comparator<Runnable> {
@Override
public int compare(Runnable r1, Runnable r2) {
RunnableWithPriority pr1 = (RunnableWithPriority) r1;
RunnableWithPriority pr2 = (RunnableWithPriority) r2;
// higher value means higher priority
int priorityResult = pr2.priority - pr1.priority;
return priorityResult != 0 ? priorityResult : (pr1.creationOrder - pr2.creationOrder);
}
}
}