Java ReentrantLock 和条件 |生产者完成工作,消费者陷入困境

Java ReentrantLock and Condition | producers finish work, consumer gets stuck

一般信息: 三个 reader 线程从一个文件中随机读取块,每个块都有一个 ID,然后写入一个普通的 ArrayList。一旦将具有所需 ID 的块添加到列表中,写入器线程就会写入输出文件。

出于这个原因,我编写了一个 BlockingChunkList,它应该同步 add() 和 getNextChunk() 方法。

我在一种情况下使用 synchronized + notify + notifyAll,在另一种情况下使用 synchronized list。

我用ReentrantLock和Condition的时候没成功。 writer-thread 只写了四个块然后就卡住了。

为什么它可能不起作用: 我怀疑一旦 readers 完成,作者就无法取回锁。但是我希望每次有东西要写时(可用=真)然后应该调用编写器线程。他们似乎忽略 hasAccess.await() when available is true.

它应该如何工作: 读取线程只调用 add 方法,只有在有东西要写入(可用)时才释放写入线程。他们也会在 available=true 时阻止自己。当作者通过调用 hasAccess.signalAll() 写了一些东西时,这个锁就会被释放 写入线程仅调用 getNextChunk() 方法,并在写入块时释放其他线程。他在 available=false 时封锁自己,然后他被 readers 释放。

问题: 读取线程完成它们的工作,写入线程只写入前 4 个块。我希望作者总是在 available=true 时被调用。

我不需要确切的解决方案,也很感谢提示,因为我想我遗漏了一些东西。所以:我错过了什么?

谢谢

编辑:并发仅在已发布的 class 中处理。主要方法只启动踏板。 编辑 2:这是我第一次尝试并发。我知道 ArrayList 不是线程安全的。我想通过使用 ReentrantLock 和 Condition 来理解这些概念。 BASIC 的想法是阻止 reader 或 writer available 是 true 还是 false。

import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

    public class BlockingChunkQueue {

        private final ArrayList<Chunk> chunks;
        private volatile int currentID = 0; // first ID for which the writer needs to wait
        private volatile boolean available; // true if the ID is in the list

        private final Lock listLock;
        private final Condition hasAccess;

        public BlockingChunkQueue(){ 
            chunks = new ArrayList<>(); 
            listLock = new ReentrantLock();
            hasAccess = listLock.newCondition();
            available = false;
        }

        /*
         * While there is a chunk which might be written to the output (=correct ID) then wait for access.
         * Then add the chunk and if the added chunk has the ID for which the other thread is waiting, then,
         * then available = true and signal the waiting thread.
         * 
         * 
         */
        public void add(Chunk chunk) throws InterruptedException{
            listLock.lock();
            try{
                while(available){
                    hasAccess.await(); // reader block yourself until you get the lock back
                }
                chunks.add(chunk);

            if(chunk.getId() == currentID){
                available = true;
                hasAccess.signalAll();
            }
            }finally{
                listLock.unlock();
            }
        }

        /*
         * If the chunk is not available then wait.
         * If it becomes available then increment the ID, remove it from the list, signal the reader-threads and set available false.
         * return the chunk which will be written to the output.
         */
        public Chunk getNextChunk() throws InterruptedException {
            listLock.lock();
            try{
                while(!available){
                    hasAccess.await(); // block yourself until u can write something
                }

                for(Chunk c : chunks){
                    if(c.getId() == currentID){
                        currentID++;
                        chunks.remove(c);
                        hasAccess.signalAll(); //signal the readers
                        available = false;
                        return c;
                    }
                }

            }finally{
                listLock.unlock();
            }
            return null;
        }

        public int getcurrentID(){ return currentID;}

        public boolean isEmpty(){ return chunks.isEmpty(); }

        public int size(){return chunks.size();}
    }

解决方案: 处理线程没有错。结果证明这是我这边的逻辑错误。写入线程被卡住了,因为他没有机会检查必要的 ID,因为写入器是随机读取块的。感谢您的帮助。

这里有几个问题。

volatile 变量 available 的目的是什么,它只在 lock 被保存时被读取或改变?

isEmtpysize 方法调用 chunks 上的方法而不持有 lockArrayList 不是线程安全的。无法预测这些调用的行为。

您可能卡住的一个原因是如果在调用 getNextChunk 之前添加了多个块。

在你的循环中找到你设置为 false 的 "current",但它实际上可能已经在列表中:

            for(Chunk c : chunks){
                if(c.getId() == currentID){
                    currentID++;
                    chunks.remove(c);
                    hasAccess.signalAll(); //signal the readers
                    available = false;
                    return c;
                }
            }

考虑将您的块存储在 Map<Integer,Chunk> 中,以便可以轻松查看标识符是否存在块。