中断线程池中的 BlockingQueue take()

Interrupting BlockingQueue take() in a thread pool

我有一个创建工人的线程池,工人从 BlockingQueue 中获取工作。 线程等待队列中的 take()。 即使在显式调用 运行 线程的线程中断方法时,它们仍在等待 take()。什么是处理 blockingqueue

的正确方法
public class ThreadPoolGen {
    static final Logger LOG = Logger.getLogger(ThreadPoolGen.class);

    private LinkedBlockingQueue<Runnable> queue;
    private int threadCount;
    private Worker[] workers;
    private Thread[] workerThreads;

    public ThreadPoolGen(int count) throws CountException{

        if(isValidCount(count))
            this.threadCount = count;
        else
            throw new CountException("Invalid Thread Count");

        workers = new Worker[count];
        workerThreads = new Thread[count];
        queue = new LinkedBlockingQueue<Runnable>();
        startThreads();
    }

    public boolean execute(Runnable task){
        return queue.offer(task);
    }

    private void startThreads(){
        synchronized (this) {

            for(int i=0;i<threadCount;i++){

                workers[i] = new Worker();
                workerThreads[i] = new Thread(workers[i]);
                workerThreads[i].start();
            }
        }
    }

    public boolean shutDown(){
        try{
            for(Worker w: workers){

                w.thread.interrupt();

            }

            queue.clear();

            for(Thread workerThread : workerThreads){

                workerThread.interrupt();

            }

            return true;

        }catch(Exception e){
            LOG.debug(Thread.currentThread()+": Worker Thread Shutdown Failed");
            return false;

        }

    }

    private boolean isValidCount(int count){

        if(count<Integer.MAX_VALUE && count>0)
            return true;
        else
            return false;
    }

    private class Worker implements Runnable{

        final Thread thread;

        private Worker(){
            this.thread = Thread.currentThread();
        }

        @Override
        public void run() {
            try{
                while(true){
                    try{
                        Runnable r = queue.take();
                        r.run();
                    }catch(InterruptedException interrupt){
                        LOG.debug("Interrupted exception in: "+thread.getName());
                    }
                }
            }catch(Exception intr){

                this.thread.interrupt();

            }finally{

                this.thread.interrupt();
            }
        }
    }
}

调用 class :

public class Runner {
    public static void main(String[] args) {
        try {
            System.out.println("BeforeLaunch");
            ThreadPoolGen gen = new ThreadPoolGen(10);
            gen.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("Inside Runnable");

                }
            });

            gen.shutDown();
        } catch (CountException ce) {
        } catch (Exception e) {
        }

    }

}

您正在 while 循环中捕获异常

while (true) {
    try {
        Runnable r = queue.take();
        r.run();
    } catch (InterruptedException interrupt) {
        LOG.debug("Interrupted exception in: " + thread.getName());
    }
}

任何时候你中断这个线程,它都会再次循环。摆脱这个 try-catch。让外层(while 之外)处理 InterruptedException.

请注意,当您的线程正在执行 run() 时,您可能会得到 interrupt,在这种情况下,InterruptedException 可能不会按照您的预期执行。您应该设置一个标志,以便在 Runnable#run() 完成后同一线程不会再次循环。

I have a thread pool which creates workers and the workers take the jobs from a BlockingQueue. The threads wait on take() from the queue. Even on explicitly calling the thread interrupt method for the running threads, they are still waiting on take(). What is right way of dealing with BlockingQueue.

在我看来,您正在复制 ExecutorService 的行为。是否有一个原因?这是 tutorial for them:

ExecutorService threadPool = Executors.newFixedThreadPool(count);
...
threadPool.submit(new Runnable() ...);

有时 运行 线程需要保留上下文,但您的 类 似乎仍然过于复杂。您仍然可以使用 ExecutorService 在生产者和消费者线程之间共享 BlockingQueue。您可以在完成后中断线程,但也可以将 countnull 对象推入队列,并让您​​的工作线程在看到 null.

时退出
public class Worker implements Runnable {
     // some sort of context needed to be held by each runner
     public void run() {
         while (true) {
             Work work = sharedQueue.take();
             if (work == null) {
                 return;
             }
             // do the work ...
         }
     }
}