使用条件变量阻塞生产者消费者 Java 实现

Blocking in Producer Consumer Java Implementation using Condition Variables

我在使用条件变量和 运行 实施生产者消费者时遇到了消费者阻塞的问题,基本上消费者线程不会获取生产者生产的最后一批产品,因此永远不会结束。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerImpl {

    protected List<Integer> buffer = new ArrayList<Integer>();

    //the lock on which condition variables are taken
    protected volatile Lock lock = new ReentrantLock(true);
    // the consumer will signal using this condition variable to the producer to start it's production
    protected volatile Condition producerStartProducing = lock.newCondition();
    // the producer will use this condition variable to start it's production
    protected volatile Condition consumerStartConsuming = lock.newCondition();

    class Consumer implements Callable<String> {

        int num;

        public Consumer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock each time, check if there is an item to consume from shared buffer
         * If yes, then consume it and loop back
         * If no, then wait for the producer to signal you.
         * Signals the producer each time it consumes an item.
         */
        @Override
        public String call() throws Exception {
            try {
                int i = 0;
                while (i < 10) {
                    while (buffer.isEmpty()) {
                        consumerStartConsuming.await();
                    }
                    lock.lock();
                    System.out.println("Consumer - " + i);
                    buffer.remove(buffer.size() - 1);
                    producerStartProducing.signalAll();
                    i++;
                }
            } finally {
                lock.unlock();
            }
            System.out.println("Consumed All");
            return "Consumed All";

        }

    }

    class Producer implements Callable<String> {

        int num;

        public Producer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock, produce items in a batch of 3, and then wait till the
         * consumer signals you to produce more.
         * Signals the consumer each time it produces an item
         */
        @Override
        public String call() throws Exception {
            try {
                int i = 0;
                while (i < 10) {
                    lock.lock();
                    while (buffer.size() > 2) {
                        producerStartProducing.await();
                    }
                    System.out.println("Producer - " + i);
                    buffer.add(1);
                    consumerStartConsuming.signalAll();
                    i++;
                }
            } finally {
                lock.unlock();
            }
            System.out.println("Produced All");
            return "Produced All";
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        try {
            ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
            Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
            Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
            final String string1 = p1.get();
            final String string2 = c1.get();
            System.out.println(string1 + " --- " + string2);
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

}

输出:

Producer - 0
Producer - 1
Producer - 2
Consumer - 0
Consumer - 1
Consumer - 2
Producer - 3
Producer - 4
Producer - 5
Consumer - 3
Consumer - 4
Consumer - 5
Producer - 6
Producer - 7
Producer - 8
Consumer - 6
Consumer - 7
Consumer - 8
Producer - 9
Produced All

生产者代码批量生产三个产品,然后使用条件变量信号等待消费者取货。

同样,每次从缓冲区中取出一个项目时,消费者线程都会拾取项目并向生产者发出信号以生产更多项目。

这只是任意实现,但此代码中此类操作的任何其他组合都存在问题,如果有人能指出这段代码出了什么问题并指出这里出了什么问题,我们将不胜感激。

锁定范围不正确,所以我已在代码下方修复它。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumerImpl {

    protected List<Integer> buffer = new ArrayList<Integer>();

    // the lock on which condition variables are taken
    protected volatile Lock lock = new ReentrantLock(true);
    // the consumer will signal using this condition variable to the producer to
    // start it's production
    protected volatile Condition producerStartProducing = lock.newCondition();
    // the producer will use this condition variable to start it's production
    protected volatile Condition consumerStartConsuming = lock.newCondition();

    class Consumer implements Callable<String> {

        int num;

        public Consumer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock each time, check if there is an item to
         * consume from shared buffer If yes, then consume it and loop back If
         * no, then wait for the producer to signal you. Signals the producer
         * each time it consumes an item.
         */
        @Override
        public String call() throws Exception {

            int i = 0;
            while (i < 10) {
                try {
                    lock.lock();
                    while (buffer.isEmpty()) {
                        consumerStartConsuming.await();
                    }

                    System.out.println("Consumer - " + i);
                    buffer.remove(buffer.size() - 1);
                    producerStartProducing.signalAll();
                    i++;
                } finally {
                    lock.unlock();
                }
            }

            System.out.println("Consumed All");
            return "Consumed All";

        }

    }

    class Producer implements Callable<String> {

        int num;

        public Producer(int i) {
            this.num = i;
        }

        /**
         * In a loop, take a lock, produce items in a batch of 3, and then wait
         * till the consumer signals you to produce more. Signals the consumer
         * each time it produces an item
         */
        @Override
        public String call() throws Exception {

            int i = 0;
            while (i < 10) {
                try {
                    lock.lock();
                    while (buffer.size() > 2) {
                        producerStartProducing.await();
                    }
                    System.out.println("Producer - " + i);
                    buffer.add(1);
                    consumerStartConsuming.signalAll();
                    i++;
                } finally {
                    lock.unlock();
                }
            }

            System.out.println("Produced All");
            return "Produced All";
        }

    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        try {
            ProducerConsumerImpl producerConsumerImpl = new ProducerConsumerImpl();
            Future<String> p1 = newFixedThreadPool.submit(producerConsumerImpl.new Producer(1));
            Future<String> c1 = newFixedThreadPool.submit(producerConsumerImpl.new Consumer(1));
            final String string1 = p1.get();
            final String string2 = c1.get();
            System.out.println(string1 + " --- " + string2);
        } finally {
            newFixedThreadPool.shutdown();
        }
    }

}