消费者、生产者——互斥量、同步——临界区

Consumer, producer- mutex, sync- critical section

我尝试了一些典型的多线程示例,之后我想尝试典型的生产者-消费者问题。

(生产者可以生产,如果有 space 并且如果消费者不消费,反之亦然)

但我对共享资源有疑问,在 java 中是否类似于 C 中的信号量? (使用 wait 和 post 函数)

我有:

ThreadContainer 中,我准备了一些锁,我发现并尝试过,但效果不佳:

java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)Running Consumer 0 [1]
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
(etc.)

如果有人解释我将不胜感激"how to"。

类低于

我的线程:

public class MyThread implements Runnable {
    private Thread t;
    private String threadName;

    private ThreadContainer container;

    MyThread(String name, ThreadContainer cont) {
        threadName = name;
        this.container = cont;
        System.out.println("Creating " + threadName);
    }

    public void run() {

    }

    public void start() {
        System.out.println("Starting " + threadName);
        if (t == null) {
            t = new Thread(this, threadName);
            t.start();
        }
    }

    public ThreadContainer getContainer() {
        return container;
    }

    public String getThreadName() {
        return threadName;
    }
}

制作人:

 public class Producer extends MyThread {

    Producer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void produce(int amount) {
        super.getContainer().produce(amount);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {

                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to produce: " + i);
                    while (!super.getContainer().canProduce(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " producing: "
                            + i);
                    super.getContainer().produce(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }

            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

消费者:

 public class Consumer extends MyThread {

    Consumer(String name, ThreadContainer cont) {
        super(name, cont);
    }

    public void consume(int am) {
        super.getContainer().consume(am);
    }

    @Override
    public void run() {
        System.out.println("Running " + super.getThreadName());
        try {
            for (int i = 10; i > 0; i--) {
                synchronized (super.getContainer().lock) {
                    System.out.println(super.getThreadName()
                            + " want to consume: " + i);
                    while (!super.getContainer().canConsume(i)) {
                        super.getContainer().lock.wait();
                    }
                    System.out.println(super.getThreadName() + " consuming: "
                            + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: "
                            + super.getContainer());
                }
            }

            Thread.sleep(50);
        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }

        System.out.println("Thread " + super.getThreadName() + " exiting.");
    }

}

容器:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadContainer {
    private int capacity;
    private int value;

    private Lock locky = new ReentrantLock(true);

    public ThreadContainer(int capacity) {
        this.capacity = capacity;
        this.value = 0; 
    }

    public void produce(int amount){
        if(this.value + amount <= this.capacity){
            this.value += amount;
        }else{
            this.value = capacity;
        }
    }

    public void consume(int amount){
        if(this.value - amount >= 0 ){
            this.value -= amount;
        }else{
            this.value =0;
        } 
    }

    public boolean canProduce(int am){
        return (this.value + am) <= this.capacity;
    }

    public boolean canConsume(int am){
        return (this.value - am) >= 0;
    }

    public boolean tryLock(){
        if(this.locky.tryLock()){
            this.locky.lock();
            return true;
        }else{
            return false;
        }
    }

    public void unlock(){
        this.locky.unlock();
        this.locky.notify();
    }

    public void waitLock() throws InterruptedException{
        this.locky.wait();
    }


    @Override
    public String toString() {
        return "capacity: " + this.capacity + ", value: " + this.value;
    }

}

主类:

public class RunFrom {
    public static void main(String args[]) {
        ThreadContainer container = new ThreadContainer(25);

        /*
        Producer prod = new Producer("Producer", container);
        prod.start();

        Consumer cons = new Consumer("Consumer", container);
        cons.start();
        */

        int prodCount =0;
        int conCount =0;
        for (int i = 0; i < 5; i++) {
            if(i%2 == 0){
                Producer prod = new Producer("Producer " + prodCount + " [" + i + "]", container);
                prodCount++;
                prod.start();
            }else{
                Consumer cons = new Consumer("Consumer " + conCount + " [" + i + "]", container);
                conCount++;
                cons.start();
            }
        }
    }
}

所以,我在post中做了如下修改@fildor link 看起来如果它适用于 2 个线程(1 个消费者和 1 个生产者),但是当我创建更多线程时仍然存在问题..

消费者

//...
    try {
                for (int i = 10; i > 0; i--) {
                    System.out.println(super.getThreadName() + " want to consume: "
                            + i);
                    System.out.println(super.getThreadName() + " consuming: " + i);
                    super.getContainer().consume(i);
                    System.out.println("Container state: " + super.getContainer());
                    Thread.sleep(100);
                }

            } catch (InterruptedException e) {
                System.out.println("Thread " + super.getThreadName()
                        + " interrupted.");
            }
//...

制作人

//...
try {
            for (int i = 10; i > 0; i--) {
                System.out.println(super.getThreadName() + " want to produce: "
                        + i);
                System.out.println(super.getThreadName() + " producing: " + i);
                super.getContainer().produce(i);
                System.out.println("Container state: " + super.getContainer());
                Thread.sleep(100);
            }

        } catch (InterruptedException e) {
            System.out.println("Thread " + super.getThreadName()
                    + " interrupted.");
        }
//...

库存容器

//...
final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();
//...

public void produce(int amount) {
        lock.lock();
        try {
            while (!canProduce(amount)) {
                notFull.wait();
            }

            if (this.value + amount <= this.capacity) {
                this.value += amount;
            } else {
                this.value = capacity;
            }
            notEmpty.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

    public void consume(int amount) {
        lock.lock();
        try {
            while (!canConsume(amount)) {
                notEmpty.wait();
            }

            if (this.value - amount >= 0) {
                this.value -= amount;
            } else {
                this.value = 0;
            }
            notFull.signal();
        } catch (InterruptedException e) {
            System.out.println("InterruptedException" + e);
        } finally {
            lock.unlock();
        }
    }

4 个线程(2 个生产者和 2 个消费者)输出如下:

Creating Producer 0 [0]
Starting Producer 0 [0]
Running Producer 0 [0]
Producer 0 [0] want to produce: 10
Producer 0 [0] producing: 10
Container state: capacity: 25, value: 10
Creating Consumer 0 [1]
Starting Consumer 0 [1]
Creating Producer 1 [2]
Starting Producer 1 [2]
Creating Consumer 1 [3]
Running Consumer 0 [1]
Starting Consumer 1 [3]
Creating Producer 2 [4]
Starting Producer 2 [4]
Running Producer 1 [2]
Producer 1 [2] want to produce: 10
Producer 1 [2] producing: 10
Consumer 0 [1] want to consume: 10
Consumer 0 [1] consuming: 10
Container state: capacity: 25, value: 20
Container state: capacity: 25, value: 10
Running Consumer 1 [3]
Consumer 1 [3] want to consume: 10
Running Producer 2 [4]
Producer 2 [4] want to produce: 10
Producer 2 [4] producing: 10
Container state: capacity: 25, value: 20
Consumer 1 [3] consuming: 10
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 9
Producer 0 [0] producing: 9
Container state: capacity: 25, value: 19
Consumer 0 [1] want to consume: 9
Consumer 0 [1] consuming: 9
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 9
Producer 1 [2] producing: 9
Container state: capacity: 25, value: 19
Producer 2 [4] want to produce: 9
Producer 2 [4] producing: 9
Exception in thread "Producer 2 [4]" Consumer 1 [3] want to consume: 9
Consumer 1 [3] consuming: 9
Container state: capacity: 25, value: 10
java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Unknown Source)
    at test.ThreadContainer.produce(ThreadContainer.java:24)
    at test.Producer.run(Producer.java:21)
    at java.lang.Thread.run(Unknown Source)
Producer 0 [0] want to produce: 8
Producer 0 [0] producing: 8
Container state: capacity: 25, value: 18
Consumer 0 [1] want to consume: 8
Consumer 0 [1] consuming: 8
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 8
Producer 1 [2] producing: 8
Container state: capacity: 25, value: 18
Consumer 1 [3] want to consume: 8
Consumer 1 [3] consuming: 8
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 7
Producer 0 [0] producing: 7
Container state: capacity: 25, value: 17
Consumer 0 [1] want to consume: 7
Consumer 0 [1] consuming: 7
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 7
Producer 1 [2] producing: 7
Container state: capacity: 25, value: 17
Consumer 1 [3] want to consume: 7
Consumer 1 [3] consuming: 7
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 6
Producer 0 [0] producing: 6
Container state: capacity: 25, value: 16
Consumer 0 [1] want to consume: 6
Consumer 0 [1] consuming: 6
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 6
Producer 1 [2] producing: 6
Container state: capacity: 25, value: 16
Consumer 1 [3] want to consume: 6
Consumer 1 [3] consuming: 6
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 5
Producer 0 [0] producing: 5
Container state: capacity: 25, value: 15
Consumer 0 [1] want to consume: 5
Consumer 0 [1] consuming: 5
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 5
Producer 1 [2] producing: 5
Container state: capacity: 25, value: 15
Consumer 1 [3] want to consume: 5
Consumer 1 [3] consuming: 5
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 4
Producer 0 [0] producing: 4
Container state: capacity: 25, value: 14
Consumer 0 [1] want to consume: 4
Consumer 0 [1] consuming: 4
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 4
Producer 1 [2] producing: 4
Container state: capacity: 25, value: 14
Consumer 1 [3] want to consume: 4
Consumer 1 [3] consuming: 4
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 3
Producer 0 [0] producing: 3
Container state: capacity: 25, value: 13
Consumer 0 [1] want to consume: 3
Consumer 0 [1] consuming: 3
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 3
Producer 1 [2] producing: 3
Container state: capacity: 25, value: 13
Consumer 1 [3] want to consume: 3
Consumer 1 [3] consuming: 3
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 2
Producer 0 [0] producing: 2
Container state: capacity: 25, value: 12
Consumer 0 [1] want to consume: 2
Consumer 0 [1] consuming: 2
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 2
Producer 1 [2] producing: 2
Container state: capacity: 25, value: 12
Consumer 1 [3] want to consume: 2
Consumer 1 [3] consuming: 2
Container state: capacity: 25, value: 10
Producer 0 [0] want to produce: 1
Producer 0 [0] producing: 1
Container state: capacity: 25, value: 11
Consumer 0 [1] want to consume: 1
Consumer 0 [1] consuming: 1
Container state: capacity: 25, value: 10
Producer 1 [2] want to produce: 1
Producer 1 [2] producing: 1
Container state: capacity: 25, value: 11
Consumer 1 [3] want to consume: 1
Consumer 1 [3] consuming: 1
Container state: capacity: 25, value: 10
Thread Producer 0 [0] exiting.
Thread Consumer 0 [1] exiting.
Thread Producer 1 [2] exiting.
Thread Consumer 1 [3] exiting.

这不是问题吗,因为更多的消费者正在等待与更多的生产者等待另一个信号相同的信号?

您真的应该在某个地方阅读 Threads in Java101 教程。您得到的异常是因为您正在等待一个对象而没有获取内部锁。给定任何用 lock 标识的对象,惯用代码是:

synchronized (lock) {
  while (!condition) {
    lock.wait();
  }
}

为什么这么复杂?

你的 MyThread class 有什么好处?我所看到的只是启动线程的代码。我不需要为此定义新的 class。我可以用一行代码开始一个新线程:

new Thread(new Producer(...)).start;

然后是您的 ThreadContainer class,其中包含仅供消费者线程调用的方法,以及仅供生产者线程调用的其他方法。这违反了一个基本的设计原则:每个 class 应该只负责 一件事情 .

您的许多代码都是将事物绑定到其他事物的胶水。 (例如,getter 和 setter 方法是胶水,而您的 MyThread class 只是胶水)。将事物绑定到其他事物称为 耦合,当你有很多耦合时,这称为 紧耦合

程序的各个部分耦合得越紧密,程序就越难理解和更改。紧密耦合的系统更有可能被破坏,如果它们被破坏,它们将更难诊断和修复。

松散耦合总是更好:不要让classes和方法depend彼此依赖绝对没有必要。


最后,如果我想演示 producer/consumer 概念,我不会用 wait()/notify() 来混淆演示。 wait() 和 notify() 方法是 low-level 原语 ,用于实现更高级别的同步对象,它们应该从代码中隐藏在更高级别上运行。

如果你想演示wait()/notify(),那是一回事。如果你想演示producer/consumer,那就另当别论了。我不会尝试将两个演示都塞进一个程序中。每个概念只会妨碍彼此炫耀。

Java 标准库提供了大量 ready-made、higher-level 同步对象供您使用。可能最通用的是 BlockingQueue。我的 producer/consumer 演示将有一个将 "products" 填充到 ArrayBlockingQueue 中的生产者线程,以及一个将它们拉出并对其进行操作的消费者线程。