在 ThreadPoolExecutor 中实现优先级队列

implementing PriorityQueue on ThreadPoolExecutor

现在已经为此苦苦挣扎了 2 天多了。

实现了我在这里看到的答案 Specify task order execution in Java

public class PriorityExecutor extends ThreadPoolExecutor {

public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                        long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
//Utitlity method to create thread pool easily
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new PriorityExecutor(nThreads, nThreads, 0L,
            TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
}
//Submit with New comparable task
public Future<?> submit(Runnable task, int priority) {
    return super.submit(new ComparableFutureTask(task, null, priority));
}
//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}
}

public class ComparableFutureTask<T> extends FutureTask<T>
    implements
    Comparable<ComparableFutureTask<T>> {

volatile int priority = 0;

public ComparableFutureTask(Runnable runnable, T result, int priority) {
    super(runnable, result);
    this.priority = priority;
}
public ComparableFutureTask(Callable<T> callable, int priority) {
    super(callable);
    this.priority = priority;
}

@Override
public int compareTo(ComparableFutureTask<T> o) {
    return Integer.valueOf(priority).compareTo(o.priority);
}
}

我使用的Runnable:MyTask

public class MyTask implements Runnable{

 public MyTask(File file, Context context, int requestId) {
    this._file = file;
    this.context = context;
    this.requestId = requestId;
}

@Override
public void run() {
      // some work
    } catch (IOException e) {
        Log.e("Callable try", post.toString());

    }
}

我的服务:MediaDownloadService

public class MediaDownloadService extends Service {

private DBHelper helper;
Notification notification;
HashMap<Integer,Future> futureTasks = new HashMap<Integer, Future>();
final int _notificationId=1;
File file;

@Override
public IBinder onBind(Intent intent) {
    return sharonsBinder;
}


@Override
public int onStartCommand(Intent intent, int flags, int startId) {
    helper = new DBHelper(getApplicationContext());
    PriorityExecutor executor = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(3);
    Log.e("requestsExists", helper.requestsExists() + "");
   if(helper.requestsExists()){
        // map of the index of the request and the string of the absolute path of the request
        Map<Integer,String> requestMap = helper.getRequestsToExcute(0);
        Set<Integer> keySet = requestMap.keySet();
        Iterator<Integer> iterator = keySet.iterator();
        Log.e("MAP",requestMap.toString());
        //checks if the DB requests exists
        if(!requestMap.isEmpty()){
            //execute them and delete the DB entry
            while(iterator.hasNext()){
                int iteratorNext = iterator.next();
                Log.e("ITREATOR", iteratorNext + "");
                file = new File(requestMap.get(iteratorNext));
                Log.e("file", file.toString());
                Log.e("thread Opened", "Thread" + iteratorNext);
                Future future = executor.submit(new MyTask(file, this, iteratorNext),10);
                futureTasks.put(iteratorNext, future);
                helper.requestTaken(iteratorNext);
            }
            Log.e("The priority queue",executor.getQueue().toString());
        }else{

            Log.e("stopself", "stop self after this");
            this.stopSelf();
        }
    }
    return START_STICKY;
}

这一行一直出错: 未来 future = executor.submit(new MyTask(file, this, iteratorNext),10);

甚至 executor.submit();假设 return 一个我不断得到的未来对象

Caused by: java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
        at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:318)
        at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:450)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1331)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:81)
        at com.vit.infibond.test.PriorityExecutor.submit(PriorityExecutor.java:26)
        at com.vit.infibond.test.MediaDownloadService.onStartCommand(MediaDownloadService.java:65)

谁能把我从这场噩梦中拯救出来?

我也尝试按照这个答案的建议去做 Testing PriorityBlockingQueue in ThreadPoolExecutor

通过添加 forNewTask 重写只是为了再次获得转换执行,但这次是为了 RunnableFuture。

我明白我的理解缺少一些基本的东西,希望得到深入的解释...

通过查看 java.util.concurrent.ThreadPoolExecutor 的源代码,提交期货时似乎很难让它正常工作。您必须覆盖感觉内部的受保护方法并进行一些讨厌的转换。

我建议您只使用 execute 方法。 Runnable 没有包装,所以它应该可以工作。

如果您需要等待作业的结果,我建议您自行实施,以免弄乱 ThreadPoolExecutor 内部结构。

sharon gur 在最底部的建议是改变

//execute with New comparable task
public void execute(Runnable command, int priority) {
    super.execute(new ComparableFutureTask(command, null, priority));
}

//execute with New comparable task
public ComparableFutureTask  execute(Runnable command, int priority) {
    ComparableFutureTask task = new ComparableFutureTask(command, null, priority);
    super.execute(task);
    return task;
}

然后在你的来电中:

CurrentTask currentTask = new CurrentTask(priority,queue)
RunnableFuture task = enhancedExecutor.execute(currentTask,priority.value)
task?.get()

我遇到了一个问题

RunnableFuture task = myExecutor.submit(currentTask)
task?.get()

导致 currentTask 被转换为 FutureTask 并且无法理解我在 CurrentTask 中的对象。作为 .execute 一个人,一切都很好。这个黑客似乎半/接近足够的工作。

因为它工作完美但没有生成文件

RunnableFuture task = myExecutor.execuute(currentTask)
    task?.get()

所以这就是我让它工作的方式(优先级被处理两次)感觉不对但有效...

当前任务::

class CurrentTask implements Runnable {
    private Priority priority
    private MyQueue queue

    public int getPriority() {
        return priority.value
    }

    public CurrentTask(Priority priority,ReportsQueue queue){
        this.priority = priority
        this.queue=queue
    }

    @Override
    public void run() {
...
}
}

优先级:

public enum Priority {

    HIGHEST(0),
    HIGH(1),
    MEDIUM(2),
    LOW(3),
    LOWEST(4)

    int value

    Priority(int val) {
        this.value = val
    }

    public int getValue(){
        return value
    }
}

然后你的遗嘱执行人打电话

public YourExecutor() {

    public YourExecutor() {
        super(maxPoolSize,maxPoolSize,timeout,TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(1000,new ReverseComparator()))
    }

所以在更改为新方法提交之前,请点击下面的比较器并且作为 TaskExecutor 将无法理解 .priority?.value ,默认情况下 .execute currentTask 是命中此点的原因,并且一切正常

public int compare(final Runnable lhs, final Runnable rhs) {

    if(lhs instanceof Runnable && rhs instanceof Runnable){
      // Favour a higher priority
        println "${lhs} vs ${lhs.getClass()}"
      if(((Runnable)lhs)?.priority?.value<((Runnable)rhs)?.priority?.value){
 ...
}

}

所以通过上面的 hack 和下面的更改它似乎工作了

class  ReverseComparator implements Comparator<ComparableFutureTask>{

  @Override
  public int compare(final ComparableFutureTask lhs, final ComparableFutureTask rhs) {

    if(lhs instanceof ComparableFutureTask && rhs instanceof ComparableFutureTask){

        // run higher priority (lower numbers before higher numbers)
        println "${lhs} vs ${lhs.getClass()} ::: ${lhs.priority}"
      if(((Runnable)lhs)?.priority<((Runnable)rhs)?.priority){
          println "-returning -1"
        return -1;
      } else if (((Runnable)lhs)?.priority>((Runnable)rhs)?.priority){
      println "-returning @@@1"
        return 1;
      } 


    }
    println "-returning ==0 "
    return 0;
  }  

只是因为我们传入了具有优先级的覆盖 ComparableFutureTask 扩展 FutureTask

希望它绕了一天和现在是有意义的:)