Java 中的多个消费者生产者

Multiple Consumer Producer In Java

我尝试为消费者创建 2 个线程,为生产者创建 2 个线程。 所有 4 个线程都在争夺一个资源。 其中两个正在尝试从资源中消耗,另外两个正在尝试生产。

下面是代码

package com.threading;

import java.util.ArrayList;
import java.util.List;

public class TestConsumerProducer2 {

    protected static int maxSize = 2;

    static class Consumer implements Runnable {
        List<Integer> goods;

        public Consumer(List<Integer> goods) {
            this.goods = goods;
        }

        public void consume() {
            synchronized (goods) {

                if (goods.size() <= 0) {
                    try {
                        goods.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + " >>>> consuming >>>" + goods.remove(0));
                goods.notifyAll();
            }
        }

        @Override
        public void run() {
            for (int i = 0; i < 10; i++) {
                consume();
                try {
                    Thread.currentThread().sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

    static class Producer implements Runnable {
        List<Integer> goods;

        public Producer(List<Integer> goods) {
            this.goods = goods;
        }

        public void produce(int i) {
            synchronized (goods) {

                if (goods.size() >= maxSize) {
                    try {
                        goods.wait();
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + ">>> producing >> " + i);
                goods.add(i);
                goods.notifyAll();
            }
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            for (int i = 0; i < 10; i++) {
                produce(i);
                try {
                    Thread.currentThread().sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        List<Integer> goods = new ArrayList<>();
        Consumer consumer = new Consumer(goods);
        Producer producer = new Producer(goods);
        Thread consumerWorker1 = new Thread(consumer);
        Thread consumerWorker2 = new Thread(consumer);

        Thread prroducerWorker1 = new Thread(producer);
        Thread prroducerWorker2 = new Thread(producer);

        consumerWorker1.start();
        consumerWorker2.start();
        prroducerWorker1.start();
        prroducerWorker2.start();

        try {
            consumerWorker1.join();
            consumerWorker2.join();
            prroducerWorker1.join();
            prroducerWorker2.join();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        System.out.println("Job completed >>>>");
    }
}

程序的输出

Thread-2>>> producing >> 0

Thread-1 >>>> consuming >>>0

Thread-3>>> producing >> 0

Exception in thread "Thread-0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.remove(ArrayList.java:496) at com.threading.TestConsumerProducer2$Consumer.consume(TestConsumerProducer2.java:27) at com.threading.TestConsumerProducer2$Consumer.run(TestConsumerProducer2.java:35) at java.lang.Thread.run(Thread.java:748) Thread-2>>> producing >> 1

Thread-1 >>>> consuming >>>0

Thread-3>>> producing >> 1

Thread-1 >>>> consuming >>>1

Thread-2>>> producing >> 2

Thread-1 >>>> consuming >>>1

Thread-3>>> producing >> 2

Thread-1 >>>> consuming >>>2

Thread-2>>> producing >> 3

Thread-1 >>>> consuming >>>2

Thread-2>>> producing >> 4

Thread-3>>> producing >> 3

Thread-1 >>>> consuming >>>3

Thread-2>>> producing >> 5

Thread-1 >>>> consuming >>>4

Thread-3>>> producing >> 4

Thread-1 >>>> consuming >>>3

Thread-2>>> producing >> 6 Thread-1 >>>> consuming >>>5

Thread-2>>> producing >> 7

Thread-3>>> producing >> 5

问题陈述: 为什么 none 个线程执行了 10 次? 代码中的死锁情况在哪里? 为什么当 Goods 对象被消费者线程锁定时会出现 IndexOutOfBoundsException 并且如果 size <=0 它应该进入等待状态?

将代码中出现的 if 更改为 while 可以解决问题。

Guarded Blocks tutorial on Oracle's site里面有相关的建议,里面写着:

Note: Always invoke wait inside a loop that tests for the condition being waited for. Don't assume that the interrupt was for the particular condition you were waiting for, or that the condition is still true.

(他们所说的中断是指 return 来自等待,不一定是来自某人调用 Thread.interrupt 的实际中断。)

要点:

  • 线程只有在检查时持有锁才能知道商品列表的内容。

  • 调用 wait 会放弃锁,允许其他线程在这个线程休眠时取得进展。

  • 一旦线程放弃锁,它之前对货物列表状态所做的任何检查都不再有效。

一旦线程return从等待中退出,它就重新获得了锁,但线程需要重新评估条件检查,否则它会根据陈旧的信息进行操作。在线程上次检查条件的时间和当前时间之间可能发生了很多事情。您得到 IllegalArgumentException 是因为当前线程假设当前线程正在等待时另一个线程删除了某些东西。

package com.threading;

import java.util.ArrayList; 

import java.util.List;

public class TestConsumerProducer2 {

protected static int maxSize = 2;

static class Consumer implements Runnable {
    List<Integer> goods;

    public Consumer(List<Integer> goods) {
        this.goods = goods;
    }

    public void consume() {
        synchronized (goods) {

            while (goods.size() <= 0) {
                try {
                    goods.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + " >>>> consuming >>>" + goods.remove(0));
            goods.notifyAll();
        }
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            consume();
            try {
                Thread.currentThread().sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

static class Producer implements Runnable {
    List<Integer> goods;

    public Producer(List<Integer> goods) {
        this.goods = goods;
    }

    public void produce(int i) {
        synchronized (goods) {

            while (goods.size() >= maxSize) {
                try {
                    goods.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + ">>> producing >> " + i);
            goods.add(i);
            goods.notifyAll();
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        for (int i = 0; i < 10; i++) {
            produce(i);
            try {
                Thread.currentThread().sleep(100);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) {
    List<Integer> goods = new ArrayList<>();
    Consumer consumer = new Consumer(goods);
    Producer producer = new Producer(goods);
    Thread consumerWorker1 = new Thread(consumer);
    Thread consumerWorker2 = new Thread(consumer);

    Thread prroducerWorker1 = new Thread(producer);
    Thread prroducerWorker2 = new Thread(producer);

    consumerWorker1.start();
    consumerWorker2.start();
    prroducerWorker1.start();
    prroducerWorker2.start();

    try {
        consumerWorker1.join();
        consumerWorker2.join();
        prroducerWorker1.join();
        prroducerWorker2.join();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    System.out.println("Job completed >>>>");
}

}

代码现在成功完成,输出如下:

C:\>java com.threading.TestConsumerProducer2
Thread-2>>> producing >> 0
Thread-1 >>>> consuming >>>0
Thread-3>>> producing >> 0
Thread-0 >>>> consuming >>>0
Thread-2>>> producing >> 1
Thread-3>>> producing >> 1
Thread-0 >>>> consuming >>>1
Thread-1 >>>> consuming >>>1
Thread-2>>> producing >> 2
Thread-3>>> producing >> 2
Thread-0 >>>> consuming >>>2
Thread-1 >>>> consuming >>>2
Thread-2>>> producing >> 3
Thread-0 >>>> consuming >>>3
Thread-3>>> producing >> 3
Thread-1 >>>> consuming >>>3
Thread-2>>> producing >> 4
Thread-0 >>>> consuming >>>4
Thread-3>>> producing >> 4
Thread-1 >>>> consuming >>>4
Thread-2>>> producing >> 5
Thread-0 >>>> consuming >>>5
Thread-3>>> producing >> 5
Thread-1 >>>> consuming >>>5
Thread-2>>> producing >> 6
Thread-0 >>>> consuming >>>6
Thread-3>>> producing >> 6
Thread-1 >>>> consuming >>>6
Thread-2>>> producing >> 7
Thread-0 >>>> consuming >>>7
Thread-3>>> producing >> 7
Thread-1 >>>> consuming >>>7
Thread-2>>> producing >> 8
Thread-0 >>>> consuming >>>8
Thread-3>>> producing >> 8
Thread-1 >>>> consuming >>>8
Thread-2>>> producing >> 9
Thread-0 >>>> consuming >>>9
Thread-3>>> producing >> 9
Thread-1 >>>> consuming >>>9
Job completed >>>>