如何避免多生产者和消费者的饥饿?
How to avoid starvation in multi producer and consumer?
- 这里考虑 2 个生产者线程和 1 个消费者线程。
- 假设队列已满。
- 两个生产者线程进入等待状态,因为队列已满。
- 消费者线程从队列中获取元素并通知所有,因此一个生产者线程添加元素并退出,另一个生产者线程保持等待状态,另一个生产者线程再次添加元素并退出。
- 因此,如果您观察到一个线程可能始终处于等待状态。
如何避免这种情况?
import java.util.LinkedList;
import java.util.List;
interface BlockingQueueCustom<E> {
void put(E item) throws InterruptedException ;
E take() throws InterruptedException;
}
class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E> {
private List<E> queue;
private int maxSize; // maximum number of elements queue can hold at a time.
public LinkedBlockingQueueCustom(int maxSize) {
this.maxSize = maxSize;
queue = new LinkedList<E>();
}
public synchronized void put(E item) throws InterruptedException {
while(queue.size() == maxSize) {
this.wait();
}
queue.add(item);
this.notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.size() == 0) {
this.wait();
}
this.notifyAll();
return queue.remove(0);
}
}
public class BlockingQueueCustomTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueueCustom<Integer> b = new LinkedBlockingQueueCustom<Integer>(10);
System.out.println("put(11)");
b.put(11);
System.out.println("put(12)");
b.put(12);
System.out.println("take() > " + b.take());
System.out.println("take() > " + b.take());
}
}
wait
和 notify
的使用已经过时 since 2005 因为它的功能有限。
对于您的具体问题,我真的建议重构您的解决方案以使用 Java Semaphore class。你会看到你可以设置一个公平参数。此参数将确保分配以 FIFO 方式完成,以便一旦您的一个线程获得许可并将数据放入您的队列中,它会在再次阻塞时被带到行尾(因此,第二个线程将获得优先权)。
希望对您有所帮助。
有几个选项。一种是使用标准 ArrayBlockingQueue
调用两个参数的构造函数。在这种情况下,队列将按照 FIFO 顺序处理线程,生产者将按照他们尝试将元素放入队列的相同顺序将元素添加到队列中。
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {...}
另一种选择是使用 ReentrantReadWriteLock
并打开公平参数而不是 synchronized
块。
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {...}
这是一个如何将 synchronized
块转换为带有 ReentrantReadWriteLock
的代码的示例:https://javarevisited.blogspot.com/2015/06/java-lock-and-condition-example-producer-consumer.html
- 这里考虑 2 个生产者线程和 1 个消费者线程。
- 假设队列已满。
- 两个生产者线程进入等待状态,因为队列已满。
- 消费者线程从队列中获取元素并通知所有,因此一个生产者线程添加元素并退出,另一个生产者线程保持等待状态,另一个生产者线程再次添加元素并退出。
- 因此,如果您观察到一个线程可能始终处于等待状态。
如何避免这种情况?
import java.util.LinkedList;
import java.util.List;
interface BlockingQueueCustom<E> {
void put(E item) throws InterruptedException ;
E take() throws InterruptedException;
}
class LinkedBlockingQueueCustom<E> implements BlockingQueueCustom<E> {
private List<E> queue;
private int maxSize; // maximum number of elements queue can hold at a time.
public LinkedBlockingQueueCustom(int maxSize) {
this.maxSize = maxSize;
queue = new LinkedList<E>();
}
public synchronized void put(E item) throws InterruptedException {
while(queue.size() == maxSize) {
this.wait();
}
queue.add(item);
this.notifyAll();
}
public synchronized E take() throws InterruptedException {
while(queue.size() == 0) {
this.wait();
}
this.notifyAll();
return queue.remove(0);
}
}
public class BlockingQueueCustomTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueueCustom<Integer> b = new LinkedBlockingQueueCustom<Integer>(10);
System.out.println("put(11)");
b.put(11);
System.out.println("put(12)");
b.put(12);
System.out.println("take() > " + b.take());
System.out.println("take() > " + b.take());
}
}
wait
和 notify
的使用已经过时 since 2005 因为它的功能有限。
对于您的具体问题,我真的建议重构您的解决方案以使用 Java Semaphore class。你会看到你可以设置一个公平参数。此参数将确保分配以 FIFO 方式完成,以便一旦您的一个线程获得许可并将数据放入您的队列中,它会在再次阻塞时被带到行尾(因此,第二个线程将获得优先权)。
希望对您有所帮助。
有几个选项。一种是使用标准 ArrayBlockingQueue
调用两个参数的构造函数。在这种情况下,队列将按照 FIFO 顺序处理线程,生产者将按照他们尝试将元素放入队列的相同顺序将元素添加到队列中。
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {...}
另一种选择是使用 ReentrantReadWriteLock
并打开公平参数而不是 synchronized
块。
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {...}
这是一个如何将 synchronized
块转换为带有 ReentrantReadWriteLock
的代码的示例:https://javarevisited.blogspot.com/2015/06/java-lock-and-condition-example-producer-consumer.html