线程不会根据条件终止,生产者-消费者线程
Thread doesn't terminate upon the condition, Producer-consumer threads
我想实现一个相当简单的任务。有 2 个队列(均有容量限制):BlockingQueue<String> source
和 BlockingQueue<String> destination
。有两种类型的线程:Producer producer
生成一条消息并存储在 BlockingQueue<String> source
。第二个 - Replacer replacer
从源中挑选,转换消息并将其插入 BlockingQueue<String> destination
。
两个questions/issues:
我不确定我是否正确实现了以下要求:如果源不为空且目标未满,则将消息从源传输到目标。
完成我的程序后,仍然有一个 运行 线程调用 - "Signal Dispatcher"。我怎样才能正确终止它?我的程序没有正确终止。
以下是相关实体的实现:
source/destination 队列的实现。
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber++;
storage.add(element);
notifyAll();
}
@Override
public synchronized E poll() {
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber--;
notifyAll();
return storage.poll();
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
生产者的实施
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
消费者的实施
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
还有助手class
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
主要class:
public class Main {
public static void main(String[] args) {
BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
// Create, configure and start PRODUCER threads.
Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);
// Create, configure and start REPLACER threads.
Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);
// Read NUMBER_OF_MESSAGES_TO_READ from destination.
for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
System.out.println(destination.poll());
}
}
}
这是工作代码。
/**
* Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
* This class provides thread-safe operations
* {@code public void offer(E element)} and {@code public E poll()}
*/
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
storage.add(element);
currentNumber++;
notifyAll();
}
@Override
public synchronized E poll() {
E polledElement;
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
polledElement = storage.poll();
currentNumber--;
return polledElement;
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber >= capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
//Continue execution of a thread if a destination is not full and source is not empty.
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
我想实现一个相当简单的任务。有 2 个队列(均有容量限制):BlockingQueue<String> source
和 BlockingQueue<String> destination
。有两种类型的线程:Producer producer
生成一条消息并存储在 BlockingQueue<String> source
。第二个 - Replacer replacer
从源中挑选,转换消息并将其插入 BlockingQueue<String> destination
。
两个questions/issues:
我不确定我是否正确实现了以下要求:如果源不为空且目标未满,则将消息从源传输到目标。
完成我的程序后,仍然有一个 运行 线程调用 - "Signal Dispatcher"。我怎样才能正确终止它?我的程序没有正确终止。
以下是相关实体的实现:
source/destination 队列的实现。
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber++;
storage.add(element);
notifyAll();
}
@Override
public synchronized E poll() {
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
currentNumber--;
notifyAll();
return storage.poll();
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber > capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
生产者的实施
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
消费者的实施
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
还有助手class
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}
主要class:
public class Main {
public static void main(String[] args) {
BlockingQueue<String> source = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
BlockingQueue<String> destination = new BlockingQueueImpl<>(Utilities.STORAGE_CAPACITY);
// Create, configure and start PRODUCER threads.
Utilities.spawnDaemonThreads("Producer", Utilities.NUMBER_OF_PRODUCER_THREADS, source, null);
// Create, configure and start REPLACER threads.
Utilities.spawnDaemonThreads("Replacer", Utilities.NUMBER_OF_REPLACER_THREADS, source, destination);
// Read NUMBER_OF_MESSAGES_TO_READ from destination.
for (int i = 1; (i < Utilities.NUMBER_OF_MESSAGES_TO_READ) && !destination.isEmpty(); i++) {
System.out.println(destination.poll());
}
}
}
这是工作代码。
/**
* Class {@code BlockingQueueImpl} is the implementation of the Blocking Queue.
* This class provides thread-safe operations
* {@code public void offer(E element)} and {@code public E poll()}
*/
public class BlockingQueueImpl<E> implements BlockingQueue<E> {
private volatile Queue<E> storage = new PriorityQueue<>();
private volatile int capacity;
private volatile int currentNumber;
public BlockingQueueImpl(int capacity) {
this.capacity = capacity;
this.storage = new PriorityQueue<E>(capacity);
}
@Override
public synchronized void offer(E element) {
while (isFull()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
storage.add(element);
currentNumber++;
notifyAll();
}
@Override
public synchronized E poll() {
E polledElement;
while (isEmpty()) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notifyAll();
polledElement = storage.poll();
currentNumber--;
return polledElement;
}
@Override
public int size() {
return capacity;
}
public synchronized boolean isFull(){
return currentNumber >= capacity;
}
public synchronized boolean isEmpty(){
return currentNumber == 0;
}
}
public class Producer implements Runnable {
BlockingQueue<String> source;
String threadName;
public Producer(BlockingQueue<String> source, String threadName) {
this.source = source;
this.threadName = threadName;
}
@Override
public void run() {
while (!source.isFull()) {
source.offer(Utilities.generateMessage(threadName));
}
}
}
public class Replacer implements Runnable {
BlockingQueue<String> source;
BlockingQueue<String> destination;
String threadName;
public Replacer(BlockingQueue<String> source,
BlockingQueue<String> destination,
String threadName) {
this.source = source;
this.destination = destination;
this.threadName = threadName;
}
public synchronized void replace() {
destination.offer(Utilities.transformMessage(threadName, source.poll()));
}
//Continue execution of a thread if a destination is not full and source is not empty.
private boolean isRunning() {
return (!destination.isFull()) && (!source.isEmpty());
}
@Override
public void run() {
while (isRunning()) {
replace();
}
}
}
public class Utilities {
public static final int NUMBER_OF_PRODUCER_THREADS = 3;
public static final int NUMBER_OF_REPLACER_THREADS = 1000;
public static final int NUMBER_OF_MESSAGES_TO_READ = 1000;
public static final int STORAGE_CAPACITY = 100;
public static String transformMessage(String threadName, String messageToTransform) {
String[] splittedString = messageToTransform.split(" ");
String newMessage = "Thread #" + threadName + " transferred message " + splittedString[splittedString.length - 1];
return newMessage;
}
public static String generateMessage(String threadName) {
return "Thread #" + threadName + " generated message #" + threadName;
}
public static void spawnDaemonThreads(String threadName,
int numberOfThreadsToSpawn,
BlockingQueue<String> source,
BlockingQueue<String> destination) {
if (destination == null) {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Producer producer = new Producer(source, name);
Thread threadProducer = new Thread(producer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
} else {
for (int i = 1; i < numberOfThreadsToSpawn + 1; i++) {
String name = threadName + i;
Replacer replacer = new Replacer(source, destination, name);
Thread threadProducer = new Thread(replacer);
threadProducer.setName(name);
threadProducer.setDaemon(true);
threadProducer.start();
}
}
}
}