信号量实现的面向生产者消费者的线程池
Semaphore-implemented Producer-Consumer oriented thread pool
我目前正在进行一项教育任务,其中我必须实现一个仅信号量的线程安全线程池。
我不能在作业中使用:Synchronize
wait
notify
sleep
或任何线程安全的 API。
首先不要过多地了解我的代码:
- 实现了一个线程安全的队列(没有两个线程可以同时queue\dequeue)(我用
ConcurrentLinkedQueue
测试了这个问题,问题仍然存在)
设计本身:
共享:
Tasks
信号量=0
Available
信号量=0
Tasks_Queue
队列
Available_Queue
队列
工作线程:
Blocked
信号量 = 0
一般信息:
只有管理器(单线程)可以出队Tasks_Queue
和Available_Queue
只有App-Main(单线程)可以入队任务是Tasks_Queue
每个工作线程都可以将自己排入 Available_Queue
所以我们混合了一个生产者、一个管理者和几个消费者。
- 当应用程序首次启动时,每个工作线程都会启动并立即在
Available_Queue
中排队,释放 Available
信号量并在获取它的个人 Blocked
信号量时被阻止。
- 每当 App-Main 将一个新任务排入队列时,它就会释放
Task
信号量
- 每当管理器希望执行新任务时,它必须首先获取
Tasks
和 Available
信号量。
我的问题:
在应用程序的运行期间,函数 dequeue_worker()
returns 一个空工作线程,即使在已知没有可用的工作线程时放置信号量以保护对队列的访问也是如此。
我有 "solved" 如果它绘制一个空线程递归调用 dequeue_worker()
的问题,但这样做是假设获取信号量许可永远丢失。然而,当我将工人数量限制为 1 时,工人不会永远被阻止。
1) 我原来设计的断点是什么?
2) 为什么我的 "solution" 没有进一步破坏设计?!
代码片段:
// only gets called by Worker threads: enqueue_worker(this);
private void enqueue_worker(Worker worker) {
available_queue.add(worker);
available.release();
}
// only gets called by App-Main (a single thread)
public void enqueue_task(Query query) {
tasks_queue.add(query);
tasks.release();
}
// only gets called by Manager(a single Thread)
private Worker dequeue_worker() {
Worker worker = null;
try {
available.acquire();
worker = available_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
} // **** the solution: ****
if (worker==null) worker = dequeue_worker(); // TODO: find out why
return worker;
}
// only gets called by Manager(a single Thread)
private Query dequeue_task() {
Query query = null;
try {
tasks.acquire();
query = tasks_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
}
return query;
}
// gets called by Manager (a single thread)
private void execute() { // check if task is available and executes it
Worker worker = dequeue_worker(); // available.down()
Query query = dequeue_task(); //task.down()
worker.setData(query);
worker.blocked.release();
}
最后是 Worker 的 Run()
方法:
while (true) { // main infinite loop
enqueue_worker(this);
acquire(); // blocked.acquire();
<C.S>
available.release();
}
您正在调用 available.release()
两次,一次在 enqueue_worker
中,第二次在主循环中。
我目前正在进行一项教育任务,其中我必须实现一个仅信号量的线程安全线程池。
我不能在作业中使用:Synchronize
wait
notify
sleep
或任何线程安全的 API。
首先不要过多地了解我的代码:
- 实现了一个线程安全的队列(没有两个线程可以同时queue\dequeue)(我用
ConcurrentLinkedQueue
测试了这个问题,问题仍然存在)
设计本身:
共享:
Tasks
信号量=0Available
信号量=0Tasks_Queue
队列Available_Queue
队列
工作线程:
Blocked
信号量 = 0
一般信息:
只有管理器(单线程)可以出队
Tasks_Queue
和Available_Queue
只有App-Main(单线程)可以入队任务是
Tasks_Queue
每个工作线程都可以将自己排入
Available_Queue
所以我们混合了一个生产者、一个管理者和几个消费者。
- 当应用程序首次启动时,每个工作线程都会启动并立即在
Available_Queue
中排队,释放Available
信号量并在获取它的个人Blocked
信号量时被阻止。 - 每当 App-Main 将一个新任务排入队列时,它就会释放
Task
信号量 - 每当管理器希望执行新任务时,它必须首先获取
Tasks
和Available
信号量。
我的问题:
在应用程序的运行期间,函数 dequeue_worker()
returns 一个空工作线程,即使在已知没有可用的工作线程时放置信号量以保护对队列的访问也是如此。
我有 "solved" 如果它绘制一个空线程递归调用 dequeue_worker()
的问题,但这样做是假设获取信号量许可永远丢失。然而,当我将工人数量限制为 1 时,工人不会永远被阻止。
1) 我原来设计的断点是什么?
2) 为什么我的 "solution" 没有进一步破坏设计?!
代码片段:
// only gets called by Worker threads: enqueue_worker(this);
private void enqueue_worker(Worker worker) {
available_queue.add(worker);
available.release();
}
// only gets called by App-Main (a single thread)
public void enqueue_task(Query query) {
tasks_queue.add(query);
tasks.release();
}
// only gets called by Manager(a single Thread)
private Worker dequeue_worker() {
Worker worker = null;
try {
available.acquire();
worker = available_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
} // **** the solution: ****
if (worker==null) worker = dequeue_worker(); // TODO: find out why
return worker;
}
// only gets called by Manager(a single Thread)
private Query dequeue_task() {
Query query = null;
try {
tasks.acquire();
query = tasks_queue.poll();
} catch (InterruptedException e) {
// shouldn't happen
}
return query;
}
// gets called by Manager (a single thread)
private void execute() { // check if task is available and executes it
Worker worker = dequeue_worker(); // available.down()
Query query = dequeue_task(); //task.down()
worker.setData(query);
worker.blocked.release();
}
最后是 Worker 的 Run()
方法:
while (true) { // main infinite loop
enqueue_worker(this);
acquire(); // blocked.acquire();
<C.S>
available.release();
}
您正在调用 available.release()
两次,一次在 enqueue_worker
中,第二次在主循环中。