模仿线程池不能正常工作
Imitation of Thread Pool doesn't work correctly
我一直在尝试使用活动对象简单地实现线程池。
这是我的 Main:
public static void main(String[] args){
MyThreadPool tp = new MyThreadPool(100,3);
tp.execute(()->{
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
System.out.println("42");
});
tp.shutDown();
}
shutDown 方法通常首先通过 Main 调用,因此意外地使活动对象保持“活动状态”,但有时我会得到想要的结果。
知道为什么结果不确定吗?
您可以在下面看到其余的类:
public class MyThreadPool {
ArrayBlockingQueue<Runnable> q;
ArrayBlockingQueue<ActiveObject> activeObjects;
volatile boolean stop;
AtomicInteger count;
Thread t;
Runnable stopTask;
public MyThreadPool(int capacity, int maxThreads) {
activeObjects = new ArrayBlockingQueue<>(maxThreads);
q = new ArrayBlockingQueue<>(capacity);
count = new AtomicInteger(0);
stopTask = ()->stop = true;
t=new Thread(()->{
//System.out.println("Thread-Pool Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
Runnable task = q.take();
if(task==stopTask)
stopTask.run();
else
//size() is atomic integer
if (count.get() < maxThreads) {
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
}
//we will assign the next task to the least busy ActiveObject
else {
int minSize = Integer.MAX_VALUE;
ActiveObject choice = null;
for (ActiveObject a : activeObjects) {
if (a.size() < minSize) {
minSize = a.size();
choice = a;
}
}
choice.execute(task);
}
} catch (InterruptedException e) { }
}
//System.out.println("Thread-Pool Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
activeObjects.forEach(a->a.shutDownNow());
stop = true;
t.interrupt();
}
public void shutDown(){
activeObjects.forEach(a->a.shutDown());
execute(stopTask);
}
public class ActiveObject {
ArrayBlockingQueue<Runnable> q;
volatile boolean stop;
Thread t;
public ActiveObject(int capacity) {
q = new ArrayBlockingQueue<>(capacity);
t=new Thread(()->{
//System.out.println("Active Object Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
q.take().run();
} catch (InterruptedException e) { }
}
//System.out.println("Active Object Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
stop = true;
t.interrupt();
}
public void shutDown(){
execute(()->stop=true);
}
public int size(){
return q.size();
}
}
在您的 main 方法中,您创建了一个线程池(它还创建并启动了 tp.t
线程),将一个任务排入 tp.q
,然后调用 tp.shutDown()
:
MyThreadPool tp = new MyThreadPool(100, 3);
tp.execute(() -> {...});
tp.shutDown();
假设主线程中的tp.shutDown()
在MyThreadPool.t
线程处理入队任务之前执行:
activeObjects.forEach(a -> a.shutDown());
execute(stopTask);
此处 activeObjects
为空,您将 stopTask
入队 tp.q
,然后 main
线程结束。
现在我们只有 MyThreadPool.t
个线程,让我们看看它做了什么:
while (!stop) {
try {
Runnable task = q.take();
if (task == stopTask)
stopTask.run();
else
if (count.get() < maxThreads) {
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
}
else {
...
}
} catch (InterruptedException e) {
}
}
此时q
包含2个任务:一个普通任务和stopTask
。
在第一个循环迭代中,正常任务取自 q
,并提供给新创建的 ActiveObject
进行处理:
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
new ActiveObject()
还创建并启动了自己的内部 ActiveObject.t
线程。
第二次循环迭代过程stopTask
:
if (task == stopTask)
stopTask.run();
设置stop = true
.
结果,下一个检查 while (!stop)
returns false
和 MyThreadPool.t
线程完成。
现在只有 ActiveObject.t
个线程,还没有停止:
while (!stop) {
try {
q.take().run();
} catch (InterruptedException e) {
}
}
此处线程将永远等待 q.take()
。
我一直在尝试使用活动对象简单地实现线程池。
这是我的 Main:
public static void main(String[] args){
MyThreadPool tp = new MyThreadPool(100,3);
tp.execute(()->{
try { Thread.sleep(5*1000); } catch (InterruptedException e) {}
System.out.println("42");
});
tp.shutDown();
}
shutDown 方法通常首先通过 Main 调用,因此意外地使活动对象保持“活动状态”,但有时我会得到想要的结果。 知道为什么结果不确定吗?
您可以在下面看到其余的类:
public class MyThreadPool {
ArrayBlockingQueue<Runnable> q;
ArrayBlockingQueue<ActiveObject> activeObjects;
volatile boolean stop;
AtomicInteger count;
Thread t;
Runnable stopTask;
public MyThreadPool(int capacity, int maxThreads) {
activeObjects = new ArrayBlockingQueue<>(maxThreads);
q = new ArrayBlockingQueue<>(capacity);
count = new AtomicInteger(0);
stopTask = ()->stop = true;
t=new Thread(()->{
//System.out.println("Thread-Pool Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
Runnable task = q.take();
if(task==stopTask)
stopTask.run();
else
//size() is atomic integer
if (count.get() < maxThreads) {
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
}
//we will assign the next task to the least busy ActiveObject
else {
int minSize = Integer.MAX_VALUE;
ActiveObject choice = null;
for (ActiveObject a : activeObjects) {
if (a.size() < minSize) {
minSize = a.size();
choice = a;
}
}
choice.execute(task);
}
} catch (InterruptedException e) { }
}
//System.out.println("Thread-Pool Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
activeObjects.forEach(a->a.shutDownNow());
stop = true;
t.interrupt();
}
public void shutDown(){
activeObjects.forEach(a->a.shutDown());
execute(stopTask);
}
public class ActiveObject {
ArrayBlockingQueue<Runnable> q;
volatile boolean stop;
Thread t;
public ActiveObject(int capacity) {
q = new ArrayBlockingQueue<>(capacity);
t=new Thread(()->{
//System.out.println("Active Object Started");
while(!stop){
//if queue is empty it is gonna be a blocking call
try {
q.take().run();
} catch (InterruptedException e) { }
}
//System.out.println("Active Object Ended");
});
t.start();
}
//execute returns right away - just puts into the queue
public void execute(Runnable r ){
// if capacity is full it is gonna be a blocking call
if(!stop)
try { q.put(r); } catch (InterruptedException e) { }
}
public void shutDownNow(){
stop = true;
t.interrupt();
}
public void shutDown(){
execute(()->stop=true);
}
public int size(){
return q.size();
}
}
在您的 main 方法中,您创建了一个线程池(它还创建并启动了 tp.t
线程),将一个任务排入 tp.q
,然后调用 tp.shutDown()
:
MyThreadPool tp = new MyThreadPool(100, 3);
tp.execute(() -> {...});
tp.shutDown();
假设主线程中的tp.shutDown()
在MyThreadPool.t
线程处理入队任务之前执行:
activeObjects.forEach(a -> a.shutDown());
execute(stopTask);
此处 activeObjects
为空,您将 stopTask
入队 tp.q
,然后 main
线程结束。
现在我们只有 MyThreadPool.t
个线程,让我们看看它做了什么:
while (!stop) {
try {
Runnable task = q.take();
if (task == stopTask)
stopTask.run();
else
if (count.get() < maxThreads) {
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
}
else {
...
}
} catch (InterruptedException e) {
}
}
此时q
包含2个任务:一个普通任务和stopTask
。
在第一个循环迭代中,正常任务取自 q
,并提供给新创建的 ActiveObject
进行处理:
ActiveObject a = new ActiveObject(capacity);
activeObjects.put(a);
count.incrementAndGet();
a.execute(task);
new ActiveObject()
还创建并启动了自己的内部 ActiveObject.t
线程。
第二次循环迭代过程stopTask
:
if (task == stopTask)
stopTask.run();
设置stop = true
.
结果,下一个检查 while (!stop)
returns false
和 MyThreadPool.t
线程完成。
现在只有 ActiveObject.t
个线程,还没有停止:
while (!stop) {
try {
q.take().run();
} catch (InterruptedException e) {
}
}
此处线程将永远等待 q.take()
。