我实现生产者-消费者问题的方法是否正确?
Is my approach to implement Producer-Consumer problem correct?
我已经使用 wait/notify 组合实现了生产者问题。有人可以告诉我我对生产者消费者问题的理解是否正确,我的实现是否 correct/optimized?
现在我在想如何使用ExecutorService
和CountDownLatch
、ReentrantLock
、CyclicBarrier
来解决同样的问题?有什么办法吗?同时我会尝试看看我是否可以使用闩锁实现问题解决方案。
import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.Random;
public class ProducerConsumerProblem {
private Object syncher = new Object();
private volatile ArrayList<Integer> sharedBuffer = new ArrayList<Integer>();
public static void main(String[] args) {
ProducerConsumerProblem object = new ProducerConsumerProblem();
Thread producerThread = new Thread(() -> {
object.produceData();
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData();
},"Consumer");
producerThread.start();
consumerThread.start();
}
public void produceData() {
Random randomNumber = new Random();
while(true) {
synchronized (syncher) {
if(sharedBuffer.size() == 1) {
try {
//System.out.println("Producer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer producedElem = randomNumber.nextInt(10);
System.out.println("+++ Produced: "+producedElem);
sharedBuffer.add(producedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
public void consumeData() {
while(true) {
synchronized (syncher) {
while(sharedBuffer.size() == 0) {
try {
//System.out.println("Consumer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer consumedElem = sharedBuffer.stream().findAny().orElseThrow(()-> new EmptyStackException());
System.out.println("--- Consumed: "+consumedElem);
sharedBuffer.remove(consumedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
}
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerProblemUsingBlockingQueue {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
public static void main(String[] args) {
ProducerConsumerProblemUsingBlockingQueue object = new ProducerConsumerProblemUsingBlockingQueue();
Thread producerThread = new Thread(() -> {
object.produceData(object.blockingQueue);
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData(object.blockingQueue);
},"Consumer");
consumerThread.start();
producerThread.start();
}
private void consumeData(BlockingQueue<Integer> blockingQueue) {
for(int i = 0; i < 10; i++) {
try {
System.out.println("Consumed: "+blockingQueue.take().intValue());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void produceData(BlockingQueue<Integer> blockingQueue) {
Random randomObject = new Random();
for(int i = 0; i < 10; i++) {
try {
int randomNumber = randomObject.nextInt(100);
System.out.println("Produced: "+randomNumber);
blockingQueue.put(randomNumber);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
我已经使用 wait/notify 组合实现了生产者问题。有人可以告诉我我对生产者消费者问题的理解是否正确,我的实现是否 correct/optimized?
现在我在想如何使用ExecutorService
和CountDownLatch
、ReentrantLock
、CyclicBarrier
来解决同样的问题?有什么办法吗?同时我会尝试看看我是否可以使用闩锁实现问题解决方案。
import java.util.ArrayList;
import java.util.EmptyStackException;
import java.util.Random;
public class ProducerConsumerProblem {
private Object syncher = new Object();
private volatile ArrayList<Integer> sharedBuffer = new ArrayList<Integer>();
public static void main(String[] args) {
ProducerConsumerProblem object = new ProducerConsumerProblem();
Thread producerThread = new Thread(() -> {
object.produceData();
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData();
},"Consumer");
producerThread.start();
consumerThread.start();
}
public void produceData() {
Random randomNumber = new Random();
while(true) {
synchronized (syncher) {
if(sharedBuffer.size() == 1) {
try {
//System.out.println("Producer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer producedElem = randomNumber.nextInt(10);
System.out.println("+++ Produced: "+producedElem);
sharedBuffer.add(producedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
public void consumeData() {
while(true) {
synchronized (syncher) {
while(sharedBuffer.size() == 0) {
try {
//System.out.println("Consumer waiting...");
syncher.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Integer consumedElem = sharedBuffer.stream().findAny().orElseThrow(()-> new EmptyStackException());
System.out.println("--- Consumed: "+consumedElem);
sharedBuffer.remove(consumedElem);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
syncher.notify();
}
}
}
}
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerProblemUsingBlockingQueue {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(1);
public static void main(String[] args) {
ProducerConsumerProblemUsingBlockingQueue object = new ProducerConsumerProblemUsingBlockingQueue();
Thread producerThread = new Thread(() -> {
object.produceData(object.blockingQueue);
},"Producer");
Thread consumerThread = new Thread(() -> {
object.consumeData(object.blockingQueue);
},"Consumer");
consumerThread.start();
producerThread.start();
}
private void consumeData(BlockingQueue<Integer> blockingQueue) {
for(int i = 0; i < 10; i++) {
try {
System.out.println("Consumed: "+blockingQueue.take().intValue());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void produceData(BlockingQueue<Integer> blockingQueue) {
Random randomObject = new Random();
for(int i = 0; i < 10; i++) {
try {
int randomNumber = randomObject.nextInt(100);
System.out.println("Produced: "+randomNumber);
blockingQueue.put(randomNumber);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}