信号量实现的面向生产者消费者的线程池

Semaphore-implemented Producer-Consumer oriented thread pool

我目前正在进行一项教育任务,其中我必须实现一个仅信号量的线程安全线程池。

我不能在作业中使用:Synchronize wait notify sleep 或任何线程安全的 API。

首先不要过多地了解我的代码:

设计本身:

共享:

工作线程:

一般信息:

所以我们混合了一个生产者、一个管理者和几个消费者。

我的问题:

在应用程序的运行期间,函数 dequeue_worker() returns 一个空工作线程,即使在已知没有可用的工作线程时放置信号量以保护对队列的访问也是如此。

我有 "solved" 如果它绘制一个空线程递归调用 dequeue_worker() 的问题,但这样做是假设获取信号量许可永远丢失。然而,当我将工人数量限制为 1 时,工人不会永远被阻止。

1) 我原来设计的断点是什么?

2) 为什么我的 "solution" 没有进一步破坏设计?!

代码片段:

// only gets called by Worker threads: enqueue_worker(this);
    private void enqueue_worker(Worker worker) {
       available_queue.add(worker);
       available.release();
    }

// only gets called by App-Main (a single thread)
    public void enqueue_task(Query query) {
        tasks_queue.add(query);
        tasks.release();
    }

// only gets called by Manager(a single Thread) 
    private Worker dequeue_worker() {
        Worker worker = null;
        try {
            available.acquire();
            worker = available_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } // **** the solution: ****
        if (worker==null) worker = dequeue_worker(); // TODO: find out why
        return worker;
    }

// only gets called by Manager(a single Thread) 
    private Query dequeue_task() {
        Query query = null;
        try {
            tasks.acquire();
            query = tasks_queue.poll();
        } catch (InterruptedException e) {
            // shouldn't happen
        } 
        return query;
    }

// gets called by Manager (a single thread)
    private void execute() { // check if task is available and executes it
        Worker worker = dequeue_worker(); // available.down()
        Query query = dequeue_task(); //task.down()
        worker.setData(query);
        worker.blocked.release();
    }

最后是 Worker 的 Run() 方法:

while (true) { // main infinite loop

                enqueue_worker(this);
                acquire(); // blocked.acquire();
                <C.S>
                available.release();
            }

您正在调用 available.release() 两次,一次在 enqueue_worker 中,第二次在主循环中。