在 ThreadPoolExecutor 中实现优先级队列
implementing PriorityQueue on ThreadPoolExecutor
现在已经为此苦苦挣扎了 2 天多了。
实现了我在这里看到的答案
Specify task order execution in Java
public class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
}
public class ComparableFutureTask<T> extends FutureTask<T>
implements
Comparable<ComparableFutureTask<T>> {
volatile int priority = 0;
public ComparableFutureTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask<T> o) {
return Integer.valueOf(priority).compareTo(o.priority);
}
}
我使用的Runnable:MyTask
public class MyTask implements Runnable{
public MyTask(File file, Context context, int requestId) {
this._file = file;
this.context = context;
this.requestId = requestId;
}
@Override
public void run() {
// some work
} catch (IOException e) {
Log.e("Callable try", post.toString());
}
}
我的服务:MediaDownloadService
public class MediaDownloadService extends Service {
private DBHelper helper;
Notification notification;
HashMap<Integer,Future> futureTasks = new HashMap<Integer, Future>();
final int _notificationId=1;
File file;
@Override
public IBinder onBind(Intent intent) {
return sharonsBinder;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
helper = new DBHelper(getApplicationContext());
PriorityExecutor executor = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(3);
Log.e("requestsExists", helper.requestsExists() + "");
if(helper.requestsExists()){
// map of the index of the request and the string of the absolute path of the request
Map<Integer,String> requestMap = helper.getRequestsToExcute(0);
Set<Integer> keySet = requestMap.keySet();
Iterator<Integer> iterator = keySet.iterator();
Log.e("MAP",requestMap.toString());
//checks if the DB requests exists
if(!requestMap.isEmpty()){
//execute them and delete the DB entry
while(iterator.hasNext()){
int iteratorNext = iterator.next();
Log.e("ITREATOR", iteratorNext + "");
file = new File(requestMap.get(iteratorNext));
Log.e("file", file.toString());
Log.e("thread Opened", "Thread" + iteratorNext);
Future future = executor.submit(new MyTask(file, this, iteratorNext),10);
futureTasks.put(iteratorNext, future);
helper.requestTaken(iteratorNext);
}
Log.e("The priority queue",executor.getQueue().toString());
}else{
Log.e("stopself", "stop self after this");
this.stopSelf();
}
}
return START_STICKY;
}
这一行一直出错:
未来 future = executor.submit(new MyTask(file, this, iteratorNext),10);
甚至 executor.submit();假设 return 一个我不断得到的未来对象
Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:318)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:450)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1331)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:81)
at com.vit.infibond.test.PriorityExecutor.submit(PriorityExecutor.java:26)
at com.vit.infibond.test.MediaDownloadService.onStartCommand(MediaDownloadService.java:65)
谁能把我从这场噩梦中拯救出来?
我也尝试按照这个答案的建议去做
Testing PriorityBlockingQueue in ThreadPoolExecutor
通过添加 forNewTask 重写只是为了再次获得转换执行,但这次是为了 RunnableFuture。
我明白我的理解缺少一些基本的东西,希望得到深入的解释...
通过查看 java.util.concurrent.ThreadPoolExecutor
的源代码,提交期货时似乎很难让它正常工作。您必须覆盖感觉内部的受保护方法并进行一些讨厌的转换。
我建议您只使用 execute
方法。 Runnable
没有包装,所以它应该可以工作。
如果您需要等待作业的结果,我建议您自行实施,以免弄乱 ThreadPoolExecutor
内部结构。
sharon gur 在最底部的建议是改变
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
到
//execute with New comparable task
public ComparableFutureTask execute(Runnable command, int priority) {
ComparableFutureTask task = new ComparableFutureTask(command, null, priority);
super.execute(task);
return task;
}
然后在你的来电中:
CurrentTask currentTask = new CurrentTask(priority,queue)
RunnableFuture task = enhancedExecutor.execute(currentTask,priority.value)
task?.get()
我遇到了一个问题
RunnableFuture task = myExecutor.submit(currentTask)
task?.get()
导致 currentTask
被转换为 FutureTask
并且无法理解我在 CurrentTask 中的对象。作为 .execute
一个人,一切都很好。这个黑客似乎半/接近足够的工作。
因为它工作完美但没有生成文件
RunnableFuture task = myExecutor.execuute(currentTask)
task?.get()
所以这就是我让它工作的方式(优先级被处理两次)感觉不对但有效...
当前任务::
class CurrentTask implements Runnable {
private Priority priority
private MyQueue queue
public int getPriority() {
return priority.value
}
public CurrentTask(Priority priority,ReportsQueue queue){
this.priority = priority
this.queue=queue
}
@Override
public void run() {
...
}
}
优先级:
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4)
int value
Priority(int val) {
this.value = val
}
public int getValue(){
return value
}
}
然后你的遗嘱执行人打电话
public YourExecutor() {
public YourExecutor() {
super(maxPoolSize,maxPoolSize,timeout,TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(1000,new ReverseComparator()))
}
所以在更改为新方法提交之前,请点击下面的比较器并且作为 TaskExecutor 将无法理解 .priority?.value ,默认情况下 .execute currentTask 是命中此点的原因,并且一切正常
public int compare(final Runnable lhs, final Runnable rhs) {
if(lhs instanceof Runnable && rhs instanceof Runnable){
// Favour a higher priority
println "${lhs} vs ${lhs.getClass()}"
if(((Runnable)lhs)?.priority?.value<((Runnable)rhs)?.priority?.value){
...
}
}
所以通过上面的 hack 和下面的更改它似乎工作了
class ReverseComparator implements Comparator<ComparableFutureTask>{
@Override
public int compare(final ComparableFutureTask lhs, final ComparableFutureTask rhs) {
if(lhs instanceof ComparableFutureTask && rhs instanceof ComparableFutureTask){
// run higher priority (lower numbers before higher numbers)
println "${lhs} vs ${lhs.getClass()} ::: ${lhs.priority}"
if(((Runnable)lhs)?.priority<((Runnable)rhs)?.priority){
println "-returning -1"
return -1;
} else if (((Runnable)lhs)?.priority>((Runnable)rhs)?.priority){
println "-returning @@@1"
return 1;
}
}
println "-returning ==0 "
return 0;
}
只是因为我们传入了具有优先级的覆盖 ComparableFutureTask
扩展 FutureTask
希望它绕了一天和现在是有意义的:)
现在已经为此苦苦挣扎了 2 天多了。
实现了我在这里看到的答案 Specify task order execution in Java
public class PriorityExecutor extends ThreadPoolExecutor {
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L,
TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
}
public class ComparableFutureTask<T> extends FutureTask<T>
implements
Comparable<ComparableFutureTask<T>> {
volatile int priority = 0;
public ComparableFutureTask(Runnable runnable, T result, int priority) {
super(runnable, result);
this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
super(callable);
this.priority = priority;
}
@Override
public int compareTo(ComparableFutureTask<T> o) {
return Integer.valueOf(priority).compareTo(o.priority);
}
}
我使用的Runnable:MyTask
public class MyTask implements Runnable{
public MyTask(File file, Context context, int requestId) {
this._file = file;
this.context = context;
this.requestId = requestId;
}
@Override
public void run() {
// some work
} catch (IOException e) {
Log.e("Callable try", post.toString());
}
}
我的服务:MediaDownloadService
public class MediaDownloadService extends Service {
private DBHelper helper;
Notification notification;
HashMap<Integer,Future> futureTasks = new HashMap<Integer, Future>();
final int _notificationId=1;
File file;
@Override
public IBinder onBind(Intent intent) {
return sharonsBinder;
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
helper = new DBHelper(getApplicationContext());
PriorityExecutor executor = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(3);
Log.e("requestsExists", helper.requestsExists() + "");
if(helper.requestsExists()){
// map of the index of the request and the string of the absolute path of the request
Map<Integer,String> requestMap = helper.getRequestsToExcute(0);
Set<Integer> keySet = requestMap.keySet();
Iterator<Integer> iterator = keySet.iterator();
Log.e("MAP",requestMap.toString());
//checks if the DB requests exists
if(!requestMap.isEmpty()){
//execute them and delete the DB entry
while(iterator.hasNext()){
int iteratorNext = iterator.next();
Log.e("ITREATOR", iteratorNext + "");
file = new File(requestMap.get(iteratorNext));
Log.e("file", file.toString());
Log.e("thread Opened", "Thread" + iteratorNext);
Future future = executor.submit(new MyTask(file, this, iteratorNext),10);
futureTasks.put(iteratorNext, future);
helper.requestTaken(iteratorNext);
}
Log.e("The priority queue",executor.getQueue().toString());
}else{
Log.e("stopself", "stop self after this");
this.stopSelf();
}
}
return START_STICKY;
}
这一行一直出错: 未来 future = executor.submit(new MyTask(file, this, iteratorNext),10);
甚至 executor.submit();假设 return 一个我不断得到的未来对象
Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:318)
at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:450)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1331)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:81)
at com.vit.infibond.test.PriorityExecutor.submit(PriorityExecutor.java:26)
at com.vit.infibond.test.MediaDownloadService.onStartCommand(MediaDownloadService.java:65)
谁能把我从这场噩梦中拯救出来?
我也尝试按照这个答案的建议去做 Testing PriorityBlockingQueue in ThreadPoolExecutor
通过添加 forNewTask 重写只是为了再次获得转换执行,但这次是为了 RunnableFuture。
我明白我的理解缺少一些基本的东西,希望得到深入的解释...
通过查看 java.util.concurrent.ThreadPoolExecutor
的源代码,提交期货时似乎很难让它正常工作。您必须覆盖感觉内部的受保护方法并进行一些讨厌的转换。
我建议您只使用 execute
方法。 Runnable
没有包装,所以它应该可以工作。
如果您需要等待作业的结果,我建议您自行实施,以免弄乱 ThreadPoolExecutor
内部结构。
sharon gur 在最底部的建议是改变
//execute with New comparable task
public void execute(Runnable command, int priority) {
super.execute(new ComparableFutureTask(command, null, priority));
}
到
//execute with New comparable task
public ComparableFutureTask execute(Runnable command, int priority) {
ComparableFutureTask task = new ComparableFutureTask(command, null, priority);
super.execute(task);
return task;
}
然后在你的来电中:
CurrentTask currentTask = new CurrentTask(priority,queue)
RunnableFuture task = enhancedExecutor.execute(currentTask,priority.value)
task?.get()
我遇到了一个问题
RunnableFuture task = myExecutor.submit(currentTask)
task?.get()
导致 currentTask
被转换为 FutureTask
并且无法理解我在 CurrentTask 中的对象。作为 .execute
一个人,一切都很好。这个黑客似乎半/接近足够的工作。
因为它工作完美但没有生成文件
RunnableFuture task = myExecutor.execuute(currentTask)
task?.get()
所以这就是我让它工作的方式(优先级被处理两次)感觉不对但有效...
当前任务::
class CurrentTask implements Runnable {
private Priority priority
private MyQueue queue
public int getPriority() {
return priority.value
}
public CurrentTask(Priority priority,ReportsQueue queue){
this.priority = priority
this.queue=queue
}
@Override
public void run() {
...
}
}
优先级:
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4)
int value
Priority(int val) {
this.value = val
}
public int getValue(){
return value
}
}
然后你的遗嘱执行人打电话
public YourExecutor() {
public YourExecutor() {
super(maxPoolSize,maxPoolSize,timeout,TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(1000,new ReverseComparator()))
}
所以在更改为新方法提交之前,请点击下面的比较器并且作为 TaskExecutor 将无法理解 .priority?.value ,默认情况下 .execute currentTask 是命中此点的原因,并且一切正常
public int compare(final Runnable lhs, final Runnable rhs) {
if(lhs instanceof Runnable && rhs instanceof Runnable){
// Favour a higher priority
println "${lhs} vs ${lhs.getClass()}"
if(((Runnable)lhs)?.priority?.value<((Runnable)rhs)?.priority?.value){
...
}
}
所以通过上面的 hack 和下面的更改它似乎工作了
class ReverseComparator implements Comparator<ComparableFutureTask>{
@Override
public int compare(final ComparableFutureTask lhs, final ComparableFutureTask rhs) {
if(lhs instanceof ComparableFutureTask && rhs instanceof ComparableFutureTask){
// run higher priority (lower numbers before higher numbers)
println "${lhs} vs ${lhs.getClass()} ::: ${lhs.priority}"
if(((Runnable)lhs)?.priority<((Runnable)rhs)?.priority){
println "-returning -1"
return -1;
} else if (((Runnable)lhs)?.priority>((Runnable)rhs)?.priority){
println "-returning @@@1"
return 1;
}
}
println "-returning ==0 "
return 0;
}
只是因为我们传入了具有优先级的覆盖 ComparableFutureTask
扩展 FutureTask
希望它绕了一天和现在是有意义的:)