模仿线程池不能正常工作

Imitation of Thread Pool doesn't work correctly

我一直在尝试使用活动对象简单地实现线程池。

这是我的 Main:

public static void main(String[] args){
   MyThreadPool tp = new MyThreadPool(100,3);
        tp.execute(()->{
            try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
            System.out.println("42");
        });
        tp.shutDown();
}

shutDown 方法通常首先通过 Main 调用,因此意外地使活动对象保持“活动状态”,但有时我会得到想要的结果。 知道为什么结果不确定吗?

您可以在下面看到其余的类

public class MyThreadPool {

    ArrayBlockingQueue<Runnable> q;
    ArrayBlockingQueue<ActiveObject> activeObjects;
    volatile boolean stop;
    AtomicInteger count;
    Thread t;
    Runnable stopTask;

    public MyThreadPool(int capacity, int maxThreads) {
        activeObjects = new ArrayBlockingQueue<>(maxThreads);
        q = new ArrayBlockingQueue<>(capacity);
        count = new AtomicInteger(0);
        stopTask = ()->stop = true;

        t=new Thread(()->{
            //System.out.println("Thread-Pool Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    Runnable task = q.take();
                    if(task==stopTask)
                        stopTask.run();
                    else
                        //size() is atomic integer
                        if (count.get() < maxThreads) {
                            ActiveObject a = new ActiveObject(capacity);
                            activeObjects.put(a);
                            count.incrementAndGet();
                            a.execute(task);
                        }
                        //we will assign the next task to the least busy ActiveObject
                        else {
                            int minSize = Integer.MAX_VALUE;
                            ActiveObject choice = null;
                            for (ActiveObject a : activeObjects) {
                                if (a.size() < minSize) {
                                    minSize = a.size();
                                    choice = a;
                                }
                            }
                            choice.execute(task);
                        }

                } catch (InterruptedException e) { }
            }
            //System.out.println("Thread-Pool Ended");
        });
       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        activeObjects.forEach(a->a.shutDownNow());
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        activeObjects.forEach(a->a.shutDown());
        execute(stopTask);
    }
public class ActiveObject {

    ArrayBlockingQueue<Runnable> q;
    volatile boolean stop;
    Thread t;

    public ActiveObject(int capacity) {
        q = new ArrayBlockingQueue<>(capacity);
        t=new Thread(()->{
            //System.out.println("Active Object Started");
            while(!stop){
                //if queue is empty it is gonna be a blocking call
                try {
                    q.take().run();
                } catch (InterruptedException e) { }
            }
            //System.out.println("Active Object Ended");
        });

       t.start();
    }

    //execute returns right away - just puts into the queue
    public void execute(Runnable r ){
        // if capacity is full it is gonna be a blocking call
        if(!stop)
            try { q.put(r); } catch (InterruptedException e) { }
    }

    public void shutDownNow(){
        stop = true;
        t.interrupt();
    }
    public void shutDown(){
        execute(()->stop=true);
    }

    public int size(){
        return q.size();
    }
}

在您的 main 方法中,您创建了一个线程池(它还创建并启动了 tp.t 线程),将一个任务排入 tp.q,然后调用 tp.shutDown():

MyThreadPool tp = new MyThreadPool(100, 3);
tp.execute(() -> {...});
tp.shutDown();

假设主线程中的tp.shutDown()MyThreadPool.t线程处理入队任务之前执行:

activeObjects.forEach(a -> a.shutDown());
execute(stopTask);

此处 activeObjects 为空,您将 stopTask 入队 tp.q,然后 main 线程结束。

现在我们只有 MyThreadPool.t 个线程,让我们看看它做了什么:

while (!stop) {
  try {
    Runnable task = q.take();
    if (task == stopTask)
      stopTask.run();
    else
    if (count.get() < maxThreads) {
      ActiveObject a = new ActiveObject(capacity);
      activeObjects.put(a);
      count.incrementAndGet();
      a.execute(task);
    }
    else {
      ...
    }
  } catch (InterruptedException e) {
  }
}

此时q包含2个任务:一个普通任务和stopTask

在第一个循环迭代中,正常任务取自 q,并提供给新创建的 ActiveObject 进行处理:

ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);

new ActiveObject() 还创建并启动了自己的内部 ActiveObject.t 线程。

第二次循环迭代过程stopTask:

if (task == stopTask)
  stopTask.run();

设置stop = true.
结果,下一个检查 while (!stop) returns falseMyThreadPool.t 线程完成。

现在只有 ActiveObject.t 个线程,还没有停止:

while (!stop) {
  try {
    q.take().run();
  } catch (InterruptedException e) {
  }
} 

此处线程将永远等待 q.take()