使用重入锁的生产者消费者不起作用
Producer Consumer using Reentrant lock not working
我正在尝试使用 ReentrantLock
实现消费者-生产者,如下所示:
class Producer implements Runnable {
private List<String> data;
private ReentrantLock lock;
Producer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
synchronized (lock)
{
while (true)
{
if ( data.size() < 5)
{
counter++;
data.add("writing:: "+counter);
}
else
{
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
class Consumer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Consumer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
synchronized (lock)
{
while (true)
{
if ( data.size() > 0)
{
System.out.println("reading:: "+data.get(data.size()-1));
data.remove(data.size()-1);
}
else
{
System.out.println("Notifying..");
lock.notify();
}
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> str = new LinkedList<>();
ReentrantLock lock= new ReentrantLock();
Thread t1 = new Thread(new Producer(str,lock));
Thread t2 = new Thread(new Consumer(str,lock));
t1.start();
t2.start();
}
}
因此,它只向列表写入一次,然后消费者无限期地等待。为什么会这样?为什么 Producer 不获取锁?
您使用 lock()
and unlock()
获取和释放 ReentrantLock
,而不是 synchronized
。
正如 Antoniosss 指出的那样,这是一个死锁,其中一个线程正在等待另一个线程永远不会放弃的锁(同时两个线程正在尝试协调该锁)。这是一种解决方案:
import static java.util.Objects.requireNonNull;
import java.util.LinkedList;
import java.util.List;
class Producer implements Runnable {
private final List<String> data;
Producer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
counter++;
data.add("writing:: " + counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
return;
}
}
}
}
}
}
class Consumer implements Runnable {
private final List<String> data;
Consumer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
synchronized (data) {
if (data.size() > 0) {
System.out.println("reading:: " + data.get(data.size() - 1));
data.remove(data.size() - 1);
}
data.notify();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> data = new LinkedList<>();
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
我们已经取消了锁定对象并在列表本身上同步。生产者在 while 循环内同步,其效果是一旦列表中至少有 5 个项目,生产者将等待,让出列表上的监视器。
消费者也在循环内进行同步,这很关键,因为在您的代码中,一旦获得锁,它就永远不会放弃对锁的监控。事实上,如果您首先启动了消费者(或非常不幸),则根本不会产生任何东西。离开同步块或方法时释放监视器,或者当持有监视器的线程wait()
s on it, but notify()
and notifyAll()
不释放监视器时。
消费者读取到最后一项,如果有的话,立即通知生产者并释放锁。两个注意事项:
首先,不清楚您对项目的期望顺序。您是否希望生产者生产 5,消费者消费 5,等等,或者您是否希望 5 只是一个限制,因此无法形成太大的积压(这很好,这称为背压),但消费者一有空就急着吃?此实现执行后者。
其次,消费者在释放监视器后立即尝试获取列表中的监视器。这是一种busy waiting的形式,然后consumer和producer竞相获取锁,可能consumer往往是那场比赛的赢家,一旦list就变得没有意义了空的。调用 onSpinWait
outside the synchronized block but inside the while loop in the consumer, in Java 9 or later. In earlier versions, yield
可能是明智的。但在我的测试中,代码在没有任何一个的情况下也能正常工作。
Antoniosss 提出了另一个建议,即使用 LinkedBlockingQueue,但目前的代码始终采用最后一项,使用队列会改变该行为。相反,我们可以使用双端队列(双端队列),将项目放在末尾并从末尾取出。这是它的样子:
import static java.util.Objects.requireNonNull;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
class Producer implements Runnable {
private final BlockingDeque<String> data;
Producer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
counter++;
try {
data.put("writing:: " + counter);
} catch (InterruptedException e) {
break;
}
}
}
}
class Consumer implements Runnable {
private final BlockingDeque<String> data;
Consumer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
try {
System.out.println("reading:: " + data.takeLast());
} catch (InterruptedException e) {
break;
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
因为 LinkedBlockingDeque
是并发数据结构,我们不需要任何同步块或等待或通知。我们可以简单地尝试从双端队列 put
和 takeLast
,如果双端队列已满或为空,它将分别阻塞。双端队列的创建容量为 5,因此如果生产者超前那么多,它会向生产者施加背压,就像原来的那样。
但是,没有什么可以阻止生产者以与消费者消费元素一样快的速度生产元素,这意味着第一个元素可能必须等待任意长的时间才能被消费。我不清楚这是否是您的代码的意图。有一些方法可以实现这一点,可以通过再次引入 wait()
和 notify()
,使用 Semaphore
s,或其他方式,但我会保留它,因为它不清楚你甚至想要它。
关于 InterruptedException
的最后一个说明。如果有人在线程上调用 interrupt()
就会发生这种情况,但是唯一保持对生产者和消费者线程的引用的是 main()
方法,并且它永远不会中断它们。所以异常不应该发生在这里,但如果它以某种方式发生,我只是让生产者或消费者退出。在一个更复杂的场景中,如果线程正在休眠或处于阻塞方法(或者甚至在一个之外,如果它明确检查它),中断线程可以用作向它发出信号的一种方式,但我们在这里没有使用它。
正如你想要的那样,Producer
不停地生产价值,Consumer
不停地消耗这些价值,最终等待价值成为产品,因为有 none , 跳过 while 同步和锁定并使用 LinkedBlockingQueue
在简单生产你:
queue.put(value)
在消费者中你做
value=queue.take();
瞧,消费者将取值,并等待值队列为空。
至于你的代码:
- 您根本没有使用
ReentrantLock
。您可以将其替换为 new Object
并且输出将相同。 wait
和 notify
是 Object
的方法。
你之所以只能产生单一的价值,在于你的消费者。实际上你有这样的东西:
synchronized(lock){ // aquire lock's monitor
while(true){
lock.notify();
}
} // release lock's monitor - never doing that, thread never leaves this block
这里的问题是你的执行从不离开同步块。所以你正在调用 notify
但你不会通过退出 synchronized
块来释放 lock
监视器。生产者是 "notified" 但为了继续执行它必须重新获取 lock
的监视器 - 但它可以,因为消费者永远不会释放它。这里几乎是经典的僵局。
你可以想象房间里有几个人,只有拿着棍子的人可以说话。所以第一个坚持 syncronized(stick)
。做它必须做的事情,并决定它必须 wait
给别人,所以他打电话给 wait
并将棍子递回去。现在第二个人可以说话,做他的工作并决定给他棍子的人现在可以继续了。他跟注 notify
- 现在他必须通过离开 synchronized(stick)
块来传递棍子。如果他不这样做,第一人称无法继续 - 这就是你的情况。
@Antoniosss 是对的。您没有正确使用 ReentrantLock,而是可以将其替换为对象。如果您想改用 ReentrantLock(更新),那么我会建议如下:
package Multithreading;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
class Producer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Producer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
while (true)
{
try {
lock.lock();
if ( data.size() < 5)
{
counter++;
data.add("writing:: " + counter);
}
}finally {
lock.unlock();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Consumer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
while (true)
{
try {
lock.lock();
if ( data.size() > 0)
{
System.out.println("reading:: "+data.get(data.size()-1));
data.remove(data.size()-1);
}
}finally {
lock.unlock();
}
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> str = new LinkedList<>();
ReentrantLock lock= new ReentrantLock();
Thread t1 = new Thread(new Producer(str,lock));
Thread t2 = new Thread(new Consumer(str,lock));
t1.start();
t2.start();
}
}
我删除了通知调用,但如果你真的需要一次让一个线程进入,只需使用 lock.notify()
我想指出你犯的两个错误:
ReentrantLock
的错误用法。
每个对象都可以用于内在锁,因此无需寻找特定的Lock
class。由于每个 synchronized
块都由一个方法限定并且您不需要 any enhanced means,因此 ReentrantLock
在这里是多余的。
synchronized
块的位置不正确。
一旦你进入一个同步块,在你离开之前没有人能进入那里。显然,你永远不会退出它,因为 while(true)
.
我建议您删除 ReentrantLock
并在 data
上进行同步。
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
data.add("writing:: " + ++counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// we are out of the synchornized block here
// to let others use data as a monitor somewhere else
}
}
我正在尝试使用 ReentrantLock
实现消费者-生产者,如下所示:
class Producer implements Runnable {
private List<String> data;
private ReentrantLock lock;
Producer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
synchronized (lock)
{
while (true)
{
if ( data.size() < 5)
{
counter++;
data.add("writing:: "+counter);
}
else
{
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
class Consumer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Consumer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
synchronized (lock)
{
while (true)
{
if ( data.size() > 0)
{
System.out.println("reading:: "+data.get(data.size()-1));
data.remove(data.size()-1);
}
else
{
System.out.println("Notifying..");
lock.notify();
}
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> str = new LinkedList<>();
ReentrantLock lock= new ReentrantLock();
Thread t1 = new Thread(new Producer(str,lock));
Thread t2 = new Thread(new Consumer(str,lock));
t1.start();
t2.start();
}
}
因此,它只向列表写入一次,然后消费者无限期地等待。为什么会这样?为什么 Producer 不获取锁?
您使用 lock()
and unlock()
获取和释放 ReentrantLock
,而不是 synchronized
。
正如 Antoniosss 指出的那样,这是一个死锁,其中一个线程正在等待另一个线程永远不会放弃的锁(同时两个线程正在尝试协调该锁)。这是一种解决方案:
import static java.util.Objects.requireNonNull;
import java.util.LinkedList;
import java.util.List;
class Producer implements Runnable {
private final List<String> data;
Producer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
counter++;
data.add("writing:: " + counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
return;
}
}
}
}
}
}
class Consumer implements Runnable {
private final List<String> data;
Consumer(List<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
synchronized (data) {
if (data.size() > 0) {
System.out.println("reading:: " + data.get(data.size() - 1));
data.remove(data.size() - 1);
}
data.notify();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> data = new LinkedList<>();
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
我们已经取消了锁定对象并在列表本身上同步。生产者在 while 循环内同步,其效果是一旦列表中至少有 5 个项目,生产者将等待,让出列表上的监视器。
消费者也在循环内进行同步,这很关键,因为在您的代码中,一旦获得锁,它就永远不会放弃对锁的监控。事实上,如果您首先启动了消费者(或非常不幸),则根本不会产生任何东西。离开同步块或方法时释放监视器,或者当持有监视器的线程wait()
s on it, but notify()
and notifyAll()
不释放监视器时。
消费者读取到最后一项,如果有的话,立即通知生产者并释放锁。两个注意事项:
首先,不清楚您对项目的期望顺序。您是否希望生产者生产 5,消费者消费 5,等等,或者您是否希望 5 只是一个限制,因此无法形成太大的积压(这很好,这称为背压),但消费者一有空就急着吃?此实现执行后者。
其次,消费者在释放监视器后立即尝试获取列表中的监视器。这是一种busy waiting的形式,然后consumer和producer竞相获取锁,可能consumer往往是那场比赛的赢家,一旦list就变得没有意义了空的。调用 onSpinWait
outside the synchronized block but inside the while loop in the consumer, in Java 9 or later. In earlier versions, yield
可能是明智的。但在我的测试中,代码在没有任何一个的情况下也能正常工作。
Antoniosss 提出了另一个建议,即使用 LinkedBlockingQueue,但目前的代码始终采用最后一项,使用队列会改变该行为。相反,我们可以使用双端队列(双端队列),将项目放在末尾并从末尾取出。这是它的样子:
import static java.util.Objects.requireNonNull;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
class Producer implements Runnable {
private final BlockingDeque<String> data;
Producer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
int counter = 0;
while (true) {
counter++;
try {
data.put("writing:: " + counter);
} catch (InterruptedException e) {
break;
}
}
}
}
class Consumer implements Runnable {
private final BlockingDeque<String> data;
Consumer(BlockingDeque<String> data) {
this.data = requireNonNull(data);
}
@Override
public void run() {
while (true) {
try {
System.out.println("reading:: " + data.takeLast());
} catch (InterruptedException e) {
break;
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
BlockingDeque<String> data = new LinkedBlockingDeque<>(5);
Thread t1 = new Thread(new Producer(data));
Thread t2 = new Thread(new Consumer(data));
t1.start();
t2.start();
}
}
因为 LinkedBlockingDeque
是并发数据结构,我们不需要任何同步块或等待或通知。我们可以简单地尝试从双端队列 put
和 takeLast
,如果双端队列已满或为空,它将分别阻塞。双端队列的创建容量为 5,因此如果生产者超前那么多,它会向生产者施加背压,就像原来的那样。
但是,没有什么可以阻止生产者以与消费者消费元素一样快的速度生产元素,这意味着第一个元素可能必须等待任意长的时间才能被消费。我不清楚这是否是您的代码的意图。有一些方法可以实现这一点,可以通过再次引入 wait()
和 notify()
,使用 Semaphore
s,或其他方式,但我会保留它,因为它不清楚你甚至想要它。
关于 InterruptedException
的最后一个说明。如果有人在线程上调用 interrupt()
就会发生这种情况,但是唯一保持对生产者和消费者线程的引用的是 main()
方法,并且它永远不会中断它们。所以异常不应该发生在这里,但如果它以某种方式发生,我只是让生产者或消费者退出。在一个更复杂的场景中,如果线程正在休眠或处于阻塞方法(或者甚至在一个之外,如果它明确检查它),中断线程可以用作向它发出信号的一种方式,但我们在这里没有使用它。
正如你想要的那样,Producer
不停地生产价值,Consumer
不停地消耗这些价值,最终等待价值成为产品,因为有 none , 跳过 while 同步和锁定并使用 LinkedBlockingQueue
在简单生产你:
queue.put(value)
在消费者中你做
value=queue.take();
瞧,消费者将取值,并等待值队列为空。
至于你的代码:
- 您根本没有使用
ReentrantLock
。您可以将其替换为new Object
并且输出将相同。wait
和notify
是Object
的方法。 你之所以只能产生单一的价值,在于你的消费者。实际上你有这样的东西:
synchronized(lock){ // aquire lock's monitor while(true){ lock.notify(); } } // release lock's monitor - never doing that, thread never leaves this block
这里的问题是你的执行从不离开同步块。所以你正在调用 notify
但你不会通过退出 synchronized
块来释放 lock
监视器。生产者是 "notified" 但为了继续执行它必须重新获取 lock
的监视器 - 但它可以,因为消费者永远不会释放它。这里几乎是经典的僵局。
你可以想象房间里有几个人,只有拿着棍子的人可以说话。所以第一个坚持 syncronized(stick)
。做它必须做的事情,并决定它必须 wait
给别人,所以他打电话给 wait
并将棍子递回去。现在第二个人可以说话,做他的工作并决定给他棍子的人现在可以继续了。他跟注 notify
- 现在他必须通过离开 synchronized(stick)
块来传递棍子。如果他不这样做,第一人称无法继续 - 这就是你的情况。
@Antoniosss 是对的。您没有正确使用 ReentrantLock,而是可以将其替换为对象。如果您想改用 ReentrantLock(更新),那么我会建议如下:
package Multithreading;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
class Producer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Producer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
while (true)
{
try {
lock.lock();
if ( data.size() < 5)
{
counter++;
data.add("writing:: " + counter);
}
}finally {
lock.unlock();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private List<String> data;
private ReentrantLock lock;
Consumer(List<String> data,ReentrantLock lock)
{
this.data = data;
this.lock = lock;
}
@Override
public void run() {
int counter = 0;
while (true)
{
try {
lock.lock();
if ( data.size() > 0)
{
System.out.println("reading:: "+data.get(data.size()-1));
data.remove(data.size()-1);
}
}finally {
lock.unlock();
}
try
{
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
List<String> str = new LinkedList<>();
ReentrantLock lock= new ReentrantLock();
Thread t1 = new Thread(new Producer(str,lock));
Thread t2 = new Thread(new Consumer(str,lock));
t1.start();
t2.start();
}
}
我删除了通知调用,但如果你真的需要一次让一个线程进入,只需使用 lock.notify()
我想指出你犯的两个错误:
ReentrantLock
的错误用法。
每个对象都可以用于内在锁,因此无需寻找特定的Lock
class。由于每个 synchronized
块都由一个方法限定并且您不需要 any enhanced means,因此 ReentrantLock
在这里是多余的。
synchronized
块的位置不正确。
一旦你进入一个同步块,在你离开之前没有人能进入那里。显然,你永远不会退出它,因为 while(true)
.
我建议您删除 ReentrantLock
并在 data
上进行同步。
@Override
public void run() {
int counter = 0;
while (true) {
synchronized (data) {
if (data.size() < 5) {
data.add("writing:: " + ++counter);
} else {
try {
data.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// we are out of the synchornized block here
// to let others use data as a monitor somewhere else
}
}