
Consumer, producer- mutex, sync- critical section


(生产者可以生产,如果有 space 并且如果消费者不消费,反之亦然)

但我对共享资源有疑问,在 java 中是否类似于 C 中的信号量? (使用 wait 和 post 函数)


ThreadContainer 中,我准备了一些锁,我发现并尝试过,但效果不佳:

    at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)

如果有人解释我将不胜感激"how to"。



public class MyThread implements Runnable {
    private Thread t;
    private String threadName;

    private ThreadContainer container;

    MyThread(String name, ThreadContainer cont) {
        threadName = name;
        this.container = cont;
        System.out.println("Creating " + threadName);

    public void run() {


    public void start() {
        System.out.println("Starting " + threadName);
        if (t == null) {
            t = new Thread(this, threadName);

    public ThreadContainer getContainer() {
        return container;

    public String getThreadName() {
        return threadName;


 public class Producer extends MyThread {

    Producer(String name, ThreadContainer cont) {
        super(name, cont);

    public void produce(int amount) {

    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {

                synchronized (super.getContainer().lock) {
                            + " want to produce: " + i);
                    while (!super.getContainer().canProduce(i)) {
                    System.out.println(super.getThreadName() + " producing: "
                            + i);
                    System.out.println("Container state: "
                            + super.getContainer());


        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");

        System.out.println("Thread " + super.getThreadName() + " exiting.");



 public class Consumer extends MyThread {

    Consumer(String name, ThreadContainer cont) {
        super(name, cont);

    public void consume(int am) {

    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {
                synchronized (super.getContainer().lock) {
                            + " want to consume: " + i);
                    while (!super.getContainer().canConsume(i)) {
                    System.out.println(super.getThreadName() + " consuming: "
                            + i);
                    System.out.println("Container state: "
                            + super.getContainer());

        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");

        System.out.println("Thread " + super.getThreadName() + " exiting.");



import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadContainer {
    private int capacity;
    private int value;

    private Lock locky = new ReentrantLock(true);

    public ThreadContainer(int capacity) {
        this.capacity = capacity;
        this.value = 0; 

    public void produce(int amount){
        if(this.value + amount <= this.capacity){
            this.value += amount;
            this.value = capacity;

    public void consume(int amount){
        if(this.value - amount >= 0 ){
            this.value -= amount;
            this.value =0;

    public boolean canProduce(int am){
        return (this.value + am) <= this.capacity;

    public boolean canConsume(int am){
        return (this.value - am) >= 0;

    public boolean tryLock(){
            return true;
            return false;

    public void unlock(){

    public void waitLock() throws InterruptedException{

    public String toString() {
        return "capacity: " + this.capacity + ", value: " + this.value;



public class RunFrom {
    public static void main(String args[]) {
        ThreadContainer container = new ThreadContainer(25);

        Producer prod = new Producer("Producer", container);

        Consumer cons = new Consumer("Consumer", container);

        int prodCount =0;
        int conCount =0;
        for (int i = 0; i < 5; i++) {
            if(i%2 == 0){
                Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
                Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);

所以,我在post中做了如下修改@fildor link 看起来如果它适用于 2 个线程(1 个消费者和 1 个生产者),但是当我创建更多线程时仍然存在问题..


    try {
                for (int i = 10; i > 0; i--) {
                    System.out.println(super.getThreadName() + " want to consume: "
                            + i);
                    System.out.println(super.getThreadName() + " consuming: " + i);
                    System.out.println("Container state: " + super.getContainer());

            } catch (InterruptedException e) {
                System.out.println("Thread " + super.getThreadName()
                        + " interrupted.");


try {
            for (int i = 10; i > 0; i--) {
                System.out.println(super.getThreadName() + " want to produce: "
                        + i);
                System.out.println(super.getThreadName() + " producing: " + i);
                System.out.println("Container state: " + super.getContainer());

        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");


final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

public void produce(int amount) {
        try {
            while (!canProduce(amount)) {

            if (this.value + amount <= this.capacity) {
                this.value += amount;
            } else {
                this.value = capacity;
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {

    public void consume(int amount) {
        try {
            while (!canConsume(amount)) {

            if (this.value - amount >= 0) {
                this.value -= amount;
            } else {
                this.value = 0;
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {

4 个线程(2 个生产者和 2 个消费者)输出如下:

Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
    at test.ThreadContainer.produce(ThreadContainer.java:24)
    at test.Producer.run(Producer.java:21)
    at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.


您真的应该在某个地方阅读 Threads in Java101 教程。您得到的异常是因为您正在等待一个对象而没有获取内部锁。给定任何用 lock 标识的对象,惯用代码是:

synchronized (lock) {
  while (!condition) {


你的 MyThread class 有什么好处?我所看到的只是启动线程的代码。我不需要为此定义新的 class。我可以用一行代码开始一个新线程:

new Thread(new Producer(...)).start;

然后是您的 ThreadContainer class,其中包含仅供消费者线程调用的方法,以及仅供生产者线程调用的其他方法。这违反了一个基本的设计原则:每个 class 应该只负责 一件事情 .

您的许多代码都是将事物绑定到其他事物的胶水。 (例如,getter 和 setter 方法是胶水,而您的 MyThread class 只是胶水)。将事物绑定到其他事物称为 耦合,当你有很多耦合时,这称为 紧耦合



最后,如果我想演示 producer/consumer 概念,我不会用 wait()/notify() 来混淆演示。 wait() 和 notify() 方法是 low-level 原语 ,用于实现更高级别的同步对象,它们应该从代码中隐藏在更高级别上运行。


Java 标准库提供了大量 ready-made、higher-level 同步对象供您使用。可能最通用的是 BlockingQueue。我的 producer/consumer 演示将有一个将 "products" 填充到 ArrayBlockingQueue 中的生产者线程,以及一个将它们拉出并对其进行操作的消费者线程。