线程不会根据条件终止,生产者-消费者线程

Thread doesn't terminate upon the condition, Producer-consumer threads

我想实现一个相当简单的任务。有 2 个队列(均有容量限制):BlockingQueue<String> sourceBlockingQueue<String> destination。有两种类型的线程:Producer producer 生成一条消息并存储在 BlockingQueue<String> source。第二个 - Replacer replacer 从源中挑选,转换消息并将其插入 BlockingQueue<String> destination

两个questions/issues:

  1. 我不确定我是否正确实现了以下要求:如果源不为空且目标未满,则将消息从源传输到目标。

  2. 完成我的程序后,仍然有一个 运行 线程调用 - "Signal Dispatcher"。我怎样才能正确终止它?我的程序没有正确终止。

以下是相关实体的实现:

source/destination 队列的实现。

public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;

public BlockingQueueImpl(int capacity) {
    this.capacity = capacity;
    this.storage = new PriorityQueue<E>(capacity);
}

@Override
public synchronized void offer(E element) {
    while (isFull()) {
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    currentNumber++;
    storage.add(element);
    notifyAll();
}

@Override
public synchronized E poll() {
    while (isEmpty()) {
        try {
            this.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    currentNumber--;
    notifyAll();
    return storage.poll();
}

@Override
public int size() {
    return capacity;
}
public synchronized boolean isFull(){
    return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
    return currentNumber == 0;
}
}

生产者的实施

public class Producer implements Runnable {
    BlockingQueue<String> source;
    String threadName;

    public Producer(BlockingQueue<String> source, String threadName) {
        this.source = source;
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (!source.isFull()) {
            source.offer(Utilities.generateMessage(threadName));
        }
    }
}

消费者的实施

public class Replacer implements Runnable {
    BlockingQueue<String> source;
    BlockingQueue<String> destination;
    String threadName;

    public Replacer(BlockingQueue<String> source,
                    BlockingQueue<String> destination,
                    String threadName) {

        this.source = source;
        this.destination = destination;
        this.threadName = threadName;
    }

    public synchronized void replace() {
        destination.offer(Utilities.transformMessage(threadName, source.poll()));
    }
private boolean isRunning() {
        return (!destination.isFull()) && (!source.isEmpty());
    }

@Override
public void run() {
    while (isRunning()) {
        replace();
    }
}


}

还有助手class

    public class Utilities {

        public static final int NUMBER_OF_PRODUCER_THREADS = 3;
        public static final int NUMBER_OF_REPLACER_THREADS = 1000;
        public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
        public static final int STORAGE_CAPACITY = 100;

        public static String transformMessage(String threadName, String messageToTransform) {
            String[] splittedString = messageToTransform.split(" ");
            String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
            return newMessage;
        }

        public static String generateMessage(String threadName) {
            return "Thread #" + threadName + " generated message #" + threadName;
        }

        public static void spawnDaemonThreads(String threadName,
                                              int numberOfThreadsToSpawn,
                                              BlockingQueue<String> source,
                                              BlockingQueue<String> destination) {

            if (destination == null) {
                for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                    String name = threadName + i;
                    Producer producer = new Producer(source, name);

                    Thread threadProducer = new Thread(producer);
                    threadProducer.setName(name);
                    threadProducer.setDaemon(true);
                    threadProducer.start();
                }
            } else {
                for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                    String name = threadName + i;
                    Replacer replacer = new Replacer(source, destination, name);

                    Thread threadProducer = new Thread(replacer);
                    threadProducer.setName(name);
                    threadProducer.setDaemon(true);
                    threadProducer.start();
                }
            }


}
}

主要class:

public class Main {
    public static void main(String[] args) {

        BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
        BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);

        // Create, configure and start PRODUCER threads.
        Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);

        // Create, configure and start REPLACER threads.
        Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);

        // Read NUMBER_OF_MESSAGES_TO_READ from destination.
       for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
        System.out.println(destination.poll());
    }
    }
}

这是工作代码。

     /**
     * Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
     * This class provides thread-safe operations
     * {@code public void offer(E element)} and {@code public E poll()}
     */

public class BlockingQueueImpl<E> implements BlockingQueue<E> {
    private volatile Queue<E> storage = new PriorityQueue<>();
    private volatile int capacity;
    private volatile int currentNumber;

    public BlockingQueueImpl(int capacity) {
        this.capacity = capacity;
        this.storage = new PriorityQueue<E>(capacity);
    }

    @Override
    public synchronized void offer(E element) {
        while (isFull()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        storage.add(element);
        currentNumber++;
        notifyAll();
    }

    @Override
    public synchronized E poll() {
        E polledElement;
        while (isEmpty()) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        notifyAll();
        polledElement = storage.poll();
        currentNumber--;
        return polledElement;
    }

    @Override
    public int size() {
        return capacity;
    }
    public synchronized boolean isFull(){
        return currentNumber >= capacity;
    }
    public synchronized boolean isEmpty(){
        return currentNumber == 0;
    }
}

public class Producer implements Runnable {
    BlockingQueue<String> source;
    String threadName;

    public Producer(BlockingQueue<String> source, String threadName) {
        this.source = source;
        this.threadName = threadName;
    }

    @Override
    public void run() {
        while (!source.isFull()) {
            source.offer(Utilities.generateMessage(threadName));
        }
    }
}

public class Replacer implements Runnable {
    BlockingQueue<String> source;
    BlockingQueue<String> destination;
    String threadName;

    public Replacer(BlockingQueue<String> source,
                    BlockingQueue<String> destination,
                    String threadName) {

        this.source = source;
        this.destination = destination;
        this.threadName = threadName;
    }

    public synchronized void replace() {
        destination.offer(Utilities.transformMessage(threadName, source.poll()));
    }

    //Continue execution of a thread if a destination is not full and source is not empty.
    private boolean isRunning() {
        return (!destination.isFull()) && (!source.isEmpty());
    }

    @Override
    public void run() {
        while (isRunning()) {
            replace();
        }
    }
}
public class Utilities {

    public static final int NUMBER_OF_PRODUCER_THREADS = 3;
    public static final int NUMBER_OF_REPLACER_THREADS = 1000;
    public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
    public static final int STORAGE_CAPACITY = 100;

    public static String transformMessage(String threadName, String messageToTransform) {
        String[] splittedString = messageToTransform.split(" ");
        String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
        return newMessage;
    }

    public static String generateMessage(String threadName) {
        return "Thread #" + threadName + " generated message #" + threadName;
    }

    public static void spawnDaemonThreads(String threadName,
                                          int numberOfThreadsToSpawn,
                                          BlockingQueue<String> source,
                                          BlockingQueue<String> destination) {

        if (destination == null) {
            for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                String name = threadName + i;
                Producer producer = new Producer(source, name);

                Thread threadProducer = new Thread(producer);
                threadProducer.setName(name);
                threadProducer.setDaemon(true);
                threadProducer.start();
            }
        } else {
            for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
                String name = threadName + i;
                Replacer replacer = new Replacer(source, destination, name);

                Thread threadProducer = new Thread(replacer);
                threadProducer.setName(name);
                threadProducer.setDaemon(true);
                threadProducer.start();
            }
        }
    }
}