Java 线程生产者-消费者共享缓冲区

Java Threads producer-consumer shared buffer

使用 Java 中的线程实现生产者消费者问题。生产者和消费者共享一个缓冲区,生产者将元素放入缓冲区,消费者从共享缓冲区中消费元素。如果缓冲区已满,生产者应该等到消费者取出元素,类似地,如果缓冲区为空,消费者应该等到生产者放入元素。 您的程序应接受以下输入:

m: the number of producer threads
n: the number of consumer threads
k: the size of the bounded buffer

您的代码应按顺序提示上述输入。您可以假设用户为其中的每一个都提供了一个有效的整数。您将需要生成 m 个生产者线程和 n 个消费者线程。每个生产者生成 20 个介于 0 和 9 之间的整数,并将它们放入缓冲区。将一个数字放入缓冲区后,它会打印它的 id 以及生成的数字。在再次重复数字生成周期之前,生产者会随机休眠一段时间。每个消费者从缓冲区中取出一个数字,然后打印它的 id 和它得到的值。然后,它会随机休眠一段时间,然后再次从缓冲区读取数据。 该程序的示例输出是:

Producer #2 put: 1
Producer #1 put: 4
Consumer #3 got: 1
Producer #1 put: 3
Consumer #3 got: 4
Consumer #3 got: 3
...

我有这个问题。很明显,缓冲区 数组 是两个方法的全局变量,因为该数组与生产者和消费者共享。所以?不幸的是我不知道如何做这个项目。有人有想法吗?

import java.security.SecureRandom;
import java.util.concurrent.*;

/**
 * Created by Leon.H on 2016/1/13.
 */
public class ProducerConsumer {
    private int producerNumber = 0;
    private int consumerNumber = 0;
    private int bufferSize = 0;
    private final int seconds;

    public ProducerConsumer(int producerNumber, int consumerNumber, int bufferSize, int seconds) {
        this.producerNumber = producerNumber;
        this.consumerNumber = consumerNumber;
        this.bufferSize = bufferSize;
        this.seconds = seconds;
        System.out.println(this.producerNumber+ ": the number of producer threads");
        System.out.println(this.consumerNumber+ ": the number of consumer threads");
        System.out.println(this.bufferSize+ ": the number of producer threads");
    }

    public void process() throws InterruptedException {
        ExecutorService producerExecutorService = Executors.newFixedThreadPool(this.producerNumber);
        ExecutorService consumerExecutorService = Executors.newFixedThreadPool(this.consumerNumber);
        BlockingQueue<Integer> integers = new ArrayBlockingQueue<Integer>(this.bufferSize);

        for (int i = 0; i < this.producerNumber; i++) {
            producerExecutorService.execute(new ProducerTask(integers));
        }

        for (int i = 0; i < this.consumerNumber; i++) {
            consumerExecutorService.execute(new ConsumerTask(integers));
        }

        producerExecutorService.shutdown();
        consumerExecutorService.shutdown();

        //let the program run 10 seconds
        producerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
        consumerExecutorService.awaitTermination(this.seconds, TimeUnit.SECONDS);
    }

    private class ProducerTask implements Runnable {
        private final BlockingQueue<Integer> integers;

        public ProducerTask(BlockingQueue<Integer> integers) {
            this.integers = integers;
        }

        public void run() {
            while (true) {
                Integer content = new SecureRandom().nextInt(1000);
                System.out.println("Producer #" + Thread.currentThread().getId() + " put: " + content);
                integers.offer(content);
                try {
                    Thread.sleep(new SecureRandom().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private class ConsumerTask implements Runnable {
        private final BlockingQueue<Integer> integers;

        public ConsumerTask(BlockingQueue<Integer> integers) {
            this.integers = integers;
        }

        public void run() {
            while (true) {
                try {
                    System.out.println("Consumer #" + Thread.currentThread().getId() + " get: " + integers.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(new SecureRandom().nextInt(1000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

import org.junit.Test;

/**
 * Created by Leon.H on 2016/1/13.
 */
public class ProducerConsumerTest {
    @Test
    public void oneProducerOneConsumerSizeOne() throws InterruptedException {
        int ProducerNumber = 1;
        int ConsumerNumber = 1;
        int size = 1;
        int seconds=5;
        ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
        producerConsumer.process();
    }

    @Test
    public void twoProducerThreeConsumerSizeThree() throws InterruptedException {
        int ProducerNumber = 2;
        int ConsumerNumber = 3;
        int size = 3;
        int seconds = 5;
        ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
        producerConsumer.process();
    }

    @Test
    public void twoHundredProducerThreeConsumerSizeThree() throws InterruptedException {
        int ProducerNumber = 20;
        int ConsumerNumber = 3;
        int size = 3;
        int seconds=5;
        ProducerConsumer producerConsumer = new ProducerConsumer(ProducerNumber, ConsumerNumber, size, seconds);
        producerConsumer.process();
    }
}