Java 线程生产者-消费者共享缓冲区
Java Threads producer-consumer shared buffer
使用 Java 中的线程实现生产者消费者问题。生产者和消费者共享一个缓冲区,生产者将元素放入缓冲区,消费者从共享缓冲区中消费元素。如果缓冲区已满,生产者应该等到消费者取出元素,类似地,如果缓冲区为空,消费者应该等到生产者放入元素。
您的程序应接受以下输入:
m: the number of producer threads
n: the number of consumer threads
k: the size of the bounded buffer
您的代码应按顺序提示上述输入。您可以假设用户为其中的每一个都提供了一个有效的整数。您将需要生成 m 个生产者线程和 n 个消费者线程。每个生产者生成 20 个介于 0 和 9 之间的整数,并将它们放入缓冲区。将一个数字放入缓冲区后,它会打印它的 id 以及生成的数字。在再次重复数字生成周期之前,生产者会随机休眠一段时间。每个消费者从缓冲区中取出一个数字,然后打印它的 id 和它得到的值。然后,它会随机休眠一段时间,然后再次从缓冲区读取数据。
该程序的示例输出是:
Producer #2 put: 1
Producer #1 put: 4
Consumer #3 got: 1
Producer #1 put: 3
Consumer #3 got: 4
Consumer #3 got: 3
...
我有这个问题。很明显,缓冲区 数组 是两个方法的全局变量,因为该数组与生产者和消费者共享。所以?不幸的是我不知道如何做这个项目。有人有想法吗?
import java.security.SecureRandom;
import java.util.concurrent.*;
/**
* Created by Leon.H on 2016/1/13.
*/
public class ProducerConsumer {
private int producerNumber = 0;
private int consumerNumber = 0;
private int bufferSize = 0;
private final int seconds;
public ProducerConsumer(int producerNumber, int consumerNumber, int bufferSize, int seconds) {
this.producerNumber = producerNumber;
this.consumerNumber = consumerNumber;
this.bufferSize = bufferSize;
this.seconds = seconds;
System.out.println(this.producerNumber+ ": the number of producer threads");
System.out.println(this.consumerNumber+ ": the number of consumer threads");
System.out.println(this.bufferSize+ ": the number of producer threads");
}
public void process() throws InterruptedException {
ExecutorService producerExecutorService = Executors.newFixedThreadPool(this.producerNumber);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(this.consumerNumber);
BlockingQueue<Integer> integers = new ArrayBlockingQueue<Integer>(this.bufferSize);
for (int i = 0; i < this.producerNumber; i++) {
producerExecutorService.execute(new ProducerTask(integers));
}
for (int i = 0; i < this.consumerNumber; i++) {
consumerExecutorService.execute(new ConsumerTask(integers));
}
producerExecutorService.shutdown();
consumerExecutorService.shutdown();
//let the program run 10 seconds
producerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
consumerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
}
private class ProducerTask implements Runnable {
private final BlockingQueue<Integer> integers;
public ProducerTask(BlockingQueue<Integer> integers) {
this.integers = integers;
}
public void run() {
while (true) {
Integer content = new SecureRandom().nextInt(1000);
System.out.println("Producer #" + Thread.currentThread().getId() + " put: " + content);
integers.offer(content);
try {
Thread.sleep(new SecureRandom().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> integers;
public ConsumerTask(BlockingQueue<Integer> integers) {
this.integers = integers;
}
public void run() {
while (true) {
try {
System.out.println("Consumer #" + Thread.currentThread().getId() + " get: " + integers.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(new SecureRandom().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
import org.junit.Test;
/**
* Created by Leon.H on 2016/1/13.
*/
public class ProducerConsumerTest {
@Test
public void oneProducerOneConsumerSizeOne() throws InterruptedException {
int ProducerNumber = 1;
int ConsumerNumber = 1;
int size = 1;
int seconds=5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
@Test
public void twoProducerThreeConsumerSizeThree() throws InterruptedException {
int ProducerNumber = 2;
int ConsumerNumber = 3;
int size = 3;
int seconds = 5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
@Test
public void twoHundredProducerThreeConsumerSizeThree() throws InterruptedException {
int ProducerNumber = 20;
int ConsumerNumber = 3;
int size = 3;
int seconds=5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
}
使用 Java 中的线程实现生产者消费者问题。生产者和消费者共享一个缓冲区,生产者将元素放入缓冲区,消费者从共享缓冲区中消费元素。如果缓冲区已满,生产者应该等到消费者取出元素,类似地,如果缓冲区为空,消费者应该等到生产者放入元素。 您的程序应接受以下输入:
m: the number of producer threads
n: the number of consumer threads
k: the size of the bounded buffer
您的代码应按顺序提示上述输入。您可以假设用户为其中的每一个都提供了一个有效的整数。您将需要生成 m 个生产者线程和 n 个消费者线程。每个生产者生成 20 个介于 0 和 9 之间的整数,并将它们放入缓冲区。将一个数字放入缓冲区后,它会打印它的 id 以及生成的数字。在再次重复数字生成周期之前,生产者会随机休眠一段时间。每个消费者从缓冲区中取出一个数字,然后打印它的 id 和它得到的值。然后,它会随机休眠一段时间,然后再次从缓冲区读取数据。 该程序的示例输出是:
Producer #2 put: 1
Producer #1 put: 4
Consumer #3 got: 1
Producer #1 put: 3
Consumer #3 got: 4
Consumer #3 got: 3
...
我有这个问题。很明显,缓冲区 数组 是两个方法的全局变量,因为该数组与生产者和消费者共享。所以?不幸的是我不知道如何做这个项目。有人有想法吗?
import java.security.SecureRandom;
import java.util.concurrent.*;
/**
* Created by Leon.H on 2016/1/13.
*/
public class ProducerConsumer {
private int producerNumber = 0;
private int consumerNumber = 0;
private int bufferSize = 0;
private final int seconds;
public ProducerConsumer(int producerNumber, int consumerNumber, int bufferSize, int seconds) {
this.producerNumber = producerNumber;
this.consumerNumber = consumerNumber;
this.bufferSize = bufferSize;
this.seconds = seconds;
System.out.println(this.producerNumber+ ": the number of producer threads");
System.out.println(this.consumerNumber+ ": the number of consumer threads");
System.out.println(this.bufferSize+ ": the number of producer threads");
}
public void process() throws InterruptedException {
ExecutorService producerExecutorService = Executors.newFixedThreadPool(this.producerNumber);
ExecutorService consumerExecutorService = Executors.newFixedThreadPool(this.consumerNumber);
BlockingQueue<Integer> integers = new ArrayBlockingQueue<Integer>(this.bufferSize);
for (int i = 0; i < this.producerNumber; i++) {
producerExecutorService.execute(new ProducerTask(integers));
}
for (int i = 0; i < this.consumerNumber; i++) {
consumerExecutorService.execute(new ConsumerTask(integers));
}
producerExecutorService.shutdown();
consumerExecutorService.shutdown();
//let the program run 10 seconds
producerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
consumerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
}
private class ProducerTask implements Runnable {
private final BlockingQueue<Integer> integers;
public ProducerTask(BlockingQueue<Integer> integers) {
this.integers = integers;
}
public void run() {
while (true) {
Integer content = new SecureRandom().nextInt(1000);
System.out.println("Producer #" + Thread.currentThread().getId() + " put: " + content);
integers.offer(content);
try {
Thread.sleep(new SecureRandom().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private class ConsumerTask implements Runnable {
private final BlockingQueue<Integer> integers;
public ConsumerTask(BlockingQueue<Integer> integers) {
this.integers = integers;
}
public void run() {
while (true) {
try {
System.out.println("Consumer #" + Thread.currentThread().getId() + " get: " + integers.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(new SecureRandom().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
import org.junit.Test;
/**
* Created by Leon.H on 2016/1/13.
*/
public class ProducerConsumerTest {
@Test
public void oneProducerOneConsumerSizeOne() throws InterruptedException {
int ProducerNumber = 1;
int ConsumerNumber = 1;
int size = 1;
int seconds=5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
@Test
public void twoProducerThreeConsumerSizeThree() throws InterruptedException {
int ProducerNumber = 2;
int ConsumerNumber = 3;
int size = 3;
int seconds = 5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
@Test
public void twoHundredProducerThreeConsumerSizeThree() throws InterruptedException {
int ProducerNumber = 20;
int ConsumerNumber = 3;
int size = 3;
int seconds=5;
ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
producerConsumer.process();
}
}