在消费者和生产者线程中等待和通知
Wait and notify in Consumer and Producer Threads
刚开始学习多线程。我在多个线程中有 5 个生产者和 2 个消费者。基本上这个程序将 100 个项目添加到队列中。当队列大小为 100 时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。目前生产者会等待,但永远不会收到消费者的通知。
制作人:
public class Producer implements Runnable {
private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();
public Producer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
public void run() {
while(true) {
if(sharedQueue.size()== queueSize){
try {
synchronized (lock) {
sharedQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
sharedQueue.put("Producer: " + sharedQueue.size());
Thread.sleep(500);
System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
public class Consumer implements Runnable{
private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();
public Consumer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
//Notify awaiting thread if the sharedQueue is empty
public void run() {
while (true) {
if(sharedQueue.size()==queueEmpty){
synchronized (lock) {
this.notifyAll();
}
}
try {
sharedQueue.take();
Thread.sleep(800);
System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
主要class
public class App{
//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
final int queueSize =100;
final int producerNum = 5;
final int consumerNum = 2;
final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);
for(int i=0;i<producerNum;i++){
Producer producer = new Producer(sharedQueue,queueSize);
executorProducer.execute(producer);
}
for(int j=0;j<consumerNum;j++){
Consumer consumer = new Consumer(sharedQueue,queueSize);
executorConsumer.execute(consumer);
}
}
}
来自 oracle 文档 page:
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control
因为您已经在使用 BlockingQueues
,您可以去掉 wait()
和 notify()
API。
多个生产者和消费者使用BlockingQueue
的示例代码:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread1.start();
prodThread2.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
它是如何工作的?
- 生产者线程 1 将 11 - 15 范围内的整数放入
BlockingQueue
- 生产者线程 2 将 21 - 25 范围内的整数放入
BlockingQueue
- 任何消费者线程 - 线程 1 或线程 2 从
BlockingQueue
(本例中为整数) 读取值
示例输出:
Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1
刚开始学习多线程。我在多个线程中有 5 个生产者和 2 个消费者。基本上这个程序将 100 个项目添加到队列中。当队列大小为 100 时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。目前生产者会等待,但永远不会收到消费者的通知。
制作人:
public class Producer implements Runnable {
private BlockingQueue sharedQueue;
private final int queueSize;
private Object lock = new Object();
public Producer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
public void run() {
while(true) {
if(sharedQueue.size()== queueSize){
try {
synchronized (lock) {
sharedQueue.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
sharedQueue.put("Producer: " + sharedQueue.size());
Thread.sleep(500);
System.out.println("Producer: Queue Size " + sharedQueue.size() + " Current Thread " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
消费者:
public class Consumer implements Runnable{
private BlockingQueue sharedQueue;
private final int queueSize;
private final int queueEmpty=0;
private Object lock = new Object();
public Consumer(BlockingQueue sharedQueue, int queueSize){
this.sharedQueue = sharedQueue;
this.queueSize = queueSize;
}
//Notify awaiting thread if the sharedQueue is empty
public void run() {
while (true) {
if(sharedQueue.size()==queueEmpty){
synchronized (lock) {
this.notifyAll();
}
}
try {
sharedQueue.take();
Thread.sleep(800);
System.out.println("Consumer: Queue Size " + sharedQueue.size() + " Current Thread" + Thread.currentThread());
}catch(InterruptedException e){
e.printStackTrace();
}
}
}
}
主要class
public class App{
//A simple program to illustrate how producer and consumer pattern works with blocking queue using executor service
public static void main( String[] args )
{
final BlockingQueue<String> sharedQueue = new ArrayBlockingQueue<String> (100);
final int queueSize =100;
final int producerNum = 5;
final int consumerNum = 2;
final ExecutorService executorProducer = Executors.newFixedThreadPool(producerNum);
final ExecutorService executorConsumer = Executors.newFixedThreadPool(consumerNum);
for(int i=0;i<producerNum;i++){
Producer producer = new Producer(sharedQueue,queueSize);
executorProducer.execute(producer);
}
for(int j=0;j<consumerNum;j++){
Consumer consumer = new Consumer(sharedQueue,queueSize);
executorConsumer.execute(consumer);
}
}
}
来自 oracle 文档 page:
BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control
因为您已经在使用 BlockingQueues
,您可以去掉 wait()
和 notify()
API。
多个生产者和消费者使用BlockingQueue
的示例代码:
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread1.start();
prodThread2.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
它是如何工作的?
- 生产者线程 1 将 11 - 15 范围内的整数放入
BlockingQueue
- 生产者线程 2 将 21 - 25 范围内的整数放入
BlockingQueue
- 任何消费者线程 - 线程 1 或线程 2 从
BlockingQueue
(本例中为整数) 读取值
示例输出:
Produced:21:by thread:2
Produced:11:by thread:1
Produced:12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 21:by thread:1
Consumed: 12:by thread:1
Consumed: 13:by thread:1
Consumed: 14:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:15:by thread:1
Consumed: 11:by thread:2
Consumed: 15:by thread:1