使用条件变量阻塞生产者消费者 Java 实现
Blocking in Producer Consumer Java Implementation using Condition Variables
我在使用条件变量和 运行 实施生产者消费者时遇到了消费者阻塞的问题,基本上消费者线程不会获取生产者生产的最后一批产品,因此永远不会结束。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
//the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to consume from shared buffer
* If yes, then consume it and loop back
* If no, then wait for the producer to signal you.
* Signals the producer each time it consumes an item.
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
lock.lock();
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait till the
* consumer signals you to produce more.
* Signals the consumer each time it produces an item
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}
输出:
Producer - 0
Producer - 1
Producer - 2
Consumer - 0
Consumer - 1
Consumer - 2
Producer - 3
Producer - 4
Producer - 5
Consumer - 3
Consumer - 4
Consumer - 5
Producer - 6
Producer - 7
Producer - 8
Consumer - 6
Consumer - 7
Consumer - 8
Producer - 9
Produced All
生产者代码批量生产三个产品,然后使用条件变量信号等待消费者取货。
同样,每次从缓冲区中取出一个项目时,消费者线程都会拾取项目并向生产者发出信号以生产更多项目。
这只是任意实现,但此代码中此类操作的任何其他组合都存在问题,如果有人能指出这段代码出了什么问题并指出这里出了什么问题,我们将不胜感激。
锁定范围不正确,所以我已在代码下方修复它。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
// the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to
// start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to
* consume from shared buffer If yes, then consume it and loop back If
* no, then wait for the producer to signal you. Signals the producer
* each time it consumes an item.
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait
* till the consumer signals you to produce more. Signals the consumer
* each time it produces an item
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}
我在使用条件变量和 运行 实施生产者消费者时遇到了消费者阻塞的问题,基本上消费者线程不会获取生产者生产的最后一批产品,因此永远不会结束。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
//the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to consume from shared buffer
* If yes, then consume it and loop back
* If no, then wait for the producer to signal you.
* Signals the producer each time it consumes an item.
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
lock.lock();
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait till the
* consumer signals you to produce more.
* Signals the consumer each time it produces an item
*/
@Override
public String call() throws Exception {
try {
int i = 0;
while (i < 10) {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
}
} finally {
lock.unlock();
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}
输出:
Producer - 0
Producer - 1
Producer - 2
Consumer - 0
Consumer - 1
Consumer - 2
Producer - 3
Producer - 4
Producer - 5
Consumer - 3
Consumer - 4
Consumer - 5
Producer - 6
Producer - 7
Producer - 8
Consumer - 6
Consumer - 7
Consumer - 8
Producer - 9
Produced All
生产者代码批量生产三个产品,然后使用条件变量信号等待消费者取货。
同样,每次从缓冲区中取出一个项目时,消费者线程都会拾取项目并向生产者发出信号以生产更多项目。
这只是任意实现,但此代码中此类操作的任何其他组合都存在问题,如果有人能指出这段代码出了什么问题并指出这里出了什么问题,我们将不胜感激。
锁定范围不正确,所以我已在代码下方修复它。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerImpl {
protected List<Integer> buffer = new ArrayList<Integer>();
// the lock on which condition variables are taken
protected volatile Lock lock = new ReentrantLock(true);
// the consumer will signal using this condition variable to the producer to
// start it's production
protected volatile Condition producerStartProducing = lock.newCondition();
// the producer will use this condition variable to start it's production
protected volatile Condition consumerStartConsuming = lock.newCondition();
class Consumer implements Callable<String> {
int num;
public Consumer(int i) {
this.num = i;
}
/**
* In a loop, take a lock each time, check if there is an item to
* consume from shared buffer If yes, then consume it and loop back If
* no, then wait for the producer to signal you. Signals the producer
* each time it consumes an item.
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.isEmpty()) {
consumerStartConsuming.await();
}
System.out.println("Consumer - " + i);
buffer.remove(buffer.size() - 1);
producerStartProducing.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Consumed All");
return "Consumed All";
}
}
class Producer implements Callable<String> {
int num;
public Producer(int i) {
this.num = i;
}
/**
* In a loop, take a lock, produce items in a batch of 3, and then wait
* till the consumer signals you to produce more. Signals the consumer
* each time it produces an item
*/
@Override
public String call() throws Exception {
int i = 0;
while (i < 10) {
try {
lock.lock();
while (buffer.size() > 2) {
producerStartProducing.await();
}
System.out.println("Producer - " + i);
buffer.add(1);
consumerStartConsuming.signalAll();
i++;
} finally {
lock.unlock();
}
}
System.out.println("Produced All");
return "Produced All";
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
try {
ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
final String string1 = p1.get();
final String string2 = c1.get();
System.out.println(string1 + " --- " + string2);
} finally {
newFixedThreadPool.shutdown();
}
}
}