如何在 ArrayBlockingQueue 中使用 Predicate 方法 "removeIf"

How to use the method "removeIf" using a Predicate in a ArrayBlockingQueue

我有以下 类:

WorkerTask.java

   public interface WorkerTask extends Task {

   // Constants
   public static final short WORKERTASK_SPIDER = 1;
   public static final short WORKERTASK_PARSER = 2;
   public static final short WORKERTASK_PRODUCT = 3;

   public int getType();
}

WorkerPool.java

class workerPool {

     private ThreadPoolExecutor executorPool_;

     //----------------------------------------------------  

     public WorkerPool(int poolSize) 
     {
        executorPool_ = new ThreadPoolExecutor(
           poolSize,5,10,TimeUnit.SECONDS,
           new ArrayBlockingQueue<Runnable>(10000000,false),
           Executors.defaultThreadFactory()
     );

     //----------------------------------------------------        

     public void assign(WorkerTask workerTask) {
         executorPool_.execute(new WorkerThread(workerTask));
     }

     //----------------------------------------------------  

     public void removeTasks(int siteID) {
        executorPool_.getQueue().removeIf(...);     
     }
}

我想调用 removeTasks 方法来删​​除一定数量的待处理任务,但我不知道如何使用 removeIf 方法。它说:删除此集合中满足给定谓词的所有元素,但我不知道如何创建参数谓词。有什么想法吗?

谓词是一个接收输入和 returns 布尔值的函数。

如果您使用的是 java8,则可以使用 lambda 表达式: (elem) -> return elem.id == siteID

如果你有 Queue<WorkerTask>,你可以这样做:

queue.removeIf(task -> task.getSiteID() == siteID)

有几个问题。一个问题是您从 getQueue() 获得的队列是 BlockingQueue<Runnable> 而不是 Queue<WorkerTask>。如果您向池中提交 Runnable 个实例,队列可能包含对您的实际任务的引用;如果是这样,您可以将它们向下转换为 WorkerTask。然而,这并不能保证。此外,ThreadPoolExecutor 的 class 文档说(在 "Queue maintenance" 下):

Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge() are available to assist in storage reclamation when large numbers of queued tasks become cancelled.

查看 remove(Runnable) 方法,它的文档说

It may fail to remove tasks that have been converted into other forms before being placed on the internal queue.

这表明您应该挂起已提交的 Runnable 个实例,以便稍后调用 remove()。或者,调用 submit(Runnable) 获取 Future 并保存这些实例以取消它们。

但是还有第二个问题可能导致此方法不适用。假设您找到了一种从队列中删除或取消匹配任务的方法。另一个线程可能已决定提交匹配的新任务,但尚未提交。这里有一个竞争条件。您也许可以取消排队的任务,但在您这样做之后,您不能保证没有提交新的匹配任务。

这是另一种方法。据推测,当您取消(或其他)站点 ID 时,某处有一些逻辑可以停止提交与该端 ID 匹配的新任务。问题是如何处理"in-flight,"即在队列中或即将入队的匹配任务。

不要尝试取消匹配的任务,而是更改任务,以便在其站点 ID 已被取消的情况下,任务变为 no-op。您可以在 ConcurrentHashMap 中记录站点 ID 的取消。任何任务都会在开始工作之前检查这张地图,如果站点 ID 存在,它只会 return。向地图添加一个站点 ID 会立即生效,确保不会在该站点 ID 上开始任何新任务。 (已经开始的任务将 运行 完成。)任何 in-flight 任务最终都会从队列中耗尽,而不会导致任何实际工作发生。