Java ReentrantLock 和条件 |生产者完成工作,消费者陷入困境
Java ReentrantLock and Condition | producers finish work, consumer gets stuck
一般信息:
三个 reader 线程从一个文件中随机读取块,每个块都有一个 ID,然后写入一个普通的 ArrayList。一旦将具有所需 ID 的块添加到列表中,写入器线程就会写入输出文件。
出于这个原因,我编写了一个 BlockingChunkList,它应该同步 add() 和 getNextChunk() 方法。
我在一种情况下使用 synchronized + notify + notifyAll,在另一种情况下使用 synchronized list。
我用ReentrantLock和Condition的时候没成功。 writer-thread 只写了四个块然后就卡住了。
为什么它可能不起作用:
我怀疑一旦 readers 完成,作者就无法取回锁。但是我希望每次有东西要写时(可用=真)然后应该调用编写器线程。他们似乎忽略 hasAccess.await() when available is true.
它应该如何工作:
读取线程只调用 add 方法,只有在有东西要写入(可用)时才释放写入线程。他们也会在 available=true 时阻止自己。当作者通过调用 hasAccess.signalAll() 写了一些东西时,这个锁就会被释放
写入线程仅调用 getNextChunk() 方法,并在写入块时释放其他线程。他在 available=false 时封锁自己,然后他被 readers 释放。
问题:
读取线程完成它们的工作,写入线程只写入前 4 个块。我希望作者总是在 available=true 时被调用。
我不需要确切的解决方案,也很感谢提示,因为我想我遗漏了一些东西。所以:我错过了什么?
谢谢
编辑:并发仅在已发布的 class 中处理。主要方法只启动踏板。
编辑 2:这是我第一次尝试并发。我知道 ArrayList 不是线程安全的。我想通过使用 ReentrantLock 和 Condition 来理解这些概念。 BASIC 的想法是阻止 reader 或 writer available 是 true 还是 false。
import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingChunkQueue {
private final ArrayList<Chunk> chunks;
private volatile int currentID = 0; // first ID for which the writer needs to wait
private volatile boolean available; // true if the ID is in the list
private final Lock listLock;
private final Condition hasAccess;
public BlockingChunkQueue(){
chunks = new ArrayList<>();
listLock = new ReentrantLock();
hasAccess = listLock.newCondition();
available = false;
}
/*
* While there is a chunk which might be written to the output (=correct ID) then wait for access.
* Then add the chunk and if the added chunk has the ID for which the other thread is waiting, then,
* then available = true and signal the waiting thread.
*
*
*/
public void add(Chunk chunk) throws InterruptedException{
listLock.lock();
try{
while(available){
hasAccess.await(); // reader block yourself until you get the lock back
}
chunks.add(chunk);
if(chunk.getId() == currentID){
available = true;
hasAccess.signalAll();
}
}finally{
listLock.unlock();
}
}
/*
* If the chunk is not available then wait.
* If it becomes available then increment the ID, remove it from the list, signal the reader-threads and set available false.
* return the chunk which will be written to the output.
*/
public Chunk getNextChunk() throws InterruptedException {
listLock.lock();
try{
while(!available){
hasAccess.await(); // block yourself until u can write something
}
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
}finally{
listLock.unlock();
}
return null;
}
public int getcurrentID(){ return currentID;}
public boolean isEmpty(){ return chunks.isEmpty(); }
public int size(){return chunks.size();}
}
解决方案:
处理线程没有错。结果证明这是我这边的逻辑错误。写入线程被卡住了,因为他没有机会检查必要的 ID,因为写入器是随机读取块的。感谢您的帮助。
这里有几个问题。
volatile 变量 available
的目的是什么,它只在 lock
被保存时被读取或改变?
isEmtpy
和 size
方法调用 chunks
上的方法而不持有 lock
。 ArrayList
不是线程安全的。无法预测这些调用的行为。
您可能卡住的一个原因是如果在调用 getNextChunk 之前添加了多个块。
在你的循环中找到你设置为 false 的 "current",但它实际上可能已经在列表中:
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
考虑将您的块存储在 Map<Integer,Chunk>
中,以便可以轻松查看标识符是否存在块。
一般信息: 三个 reader 线程从一个文件中随机读取块,每个块都有一个 ID,然后写入一个普通的 ArrayList。一旦将具有所需 ID 的块添加到列表中,写入器线程就会写入输出文件。
出于这个原因,我编写了一个 BlockingChunkList,它应该同步 add() 和 getNextChunk() 方法。
我在一种情况下使用 synchronized + notify + notifyAll,在另一种情况下使用 synchronized list。
我用ReentrantLock和Condition的时候没成功。 writer-thread 只写了四个块然后就卡住了。
为什么它可能不起作用: 我怀疑一旦 readers 完成,作者就无法取回锁。但是我希望每次有东西要写时(可用=真)然后应该调用编写器线程。他们似乎忽略 hasAccess.await() when available is true.
它应该如何工作: 读取线程只调用 add 方法,只有在有东西要写入(可用)时才释放写入线程。他们也会在 available=true 时阻止自己。当作者通过调用 hasAccess.signalAll() 写了一些东西时,这个锁就会被释放 写入线程仅调用 getNextChunk() 方法,并在写入块时释放其他线程。他在 available=false 时封锁自己,然后他被 readers 释放。
问题: 读取线程完成它们的工作,写入线程只写入前 4 个块。我希望作者总是在 available=true 时被调用。
我不需要确切的解决方案,也很感谢提示,因为我想我遗漏了一些东西。所以:我错过了什么?
谢谢
编辑:并发仅在已发布的 class 中处理。主要方法只启动踏板。 编辑 2:这是我第一次尝试并发。我知道 ArrayList 不是线程安全的。我想通过使用 ReentrantLock 和 Condition 来理解这些概念。 BASIC 的想法是阻止 reader 或 writer available 是 true 还是 false。
import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingChunkQueue {
private final ArrayList<Chunk> chunks;
private volatile int currentID = 0; // first ID for which the writer needs to wait
private volatile boolean available; // true if the ID is in the list
private final Lock listLock;
private final Condition hasAccess;
public BlockingChunkQueue(){
chunks = new ArrayList<>();
listLock = new ReentrantLock();
hasAccess = listLock.newCondition();
available = false;
}
/*
* While there is a chunk which might be written to the output (=correct ID) then wait for access.
* Then add the chunk and if the added chunk has the ID for which the other thread is waiting, then,
* then available = true and signal the waiting thread.
*
*
*/
public void add(Chunk chunk) throws InterruptedException{
listLock.lock();
try{
while(available){
hasAccess.await(); // reader block yourself until you get the lock back
}
chunks.add(chunk);
if(chunk.getId() == currentID){
available = true;
hasAccess.signalAll();
}
}finally{
listLock.unlock();
}
}
/*
* If the chunk is not available then wait.
* If it becomes available then increment the ID, remove it from the list, signal the reader-threads and set available false.
* return the chunk which will be written to the output.
*/
public Chunk getNextChunk() throws InterruptedException {
listLock.lock();
try{
while(!available){
hasAccess.await(); // block yourself until u can write something
}
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
}finally{
listLock.unlock();
}
return null;
}
public int getcurrentID(){ return currentID;}
public boolean isEmpty(){ return chunks.isEmpty(); }
public int size(){return chunks.size();}
}
解决方案: 处理线程没有错。结果证明这是我这边的逻辑错误。写入线程被卡住了,因为他没有机会检查必要的 ID,因为写入器是随机读取块的。感谢您的帮助。
这里有几个问题。
volatile 变量 available
的目的是什么,它只在 lock
被保存时被读取或改变?
isEmtpy
和 size
方法调用 chunks
上的方法而不持有 lock
。 ArrayList
不是线程安全的。无法预测这些调用的行为。
您可能卡住的一个原因是如果在调用 getNextChunk 之前添加了多个块。
在你的循环中找到你设置为 false 的 "current",但它实际上可能已经在列表中:
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
考虑将您的块存储在 Map<Integer,Chunk>
中,以便可以轻松查看标识符是否存在块。