在单个消费者和任意数量的生产者的情况下,生产者-消费者仅使用 notify()

Producer-Consumer using only notify() in the case of a single Consumer and an arbitrary number of Producers

下面是我实现生产者-消费者问题的代码。使用 notifyAll() 一切正常,但是由于性能原因,我想将所有 notifyAll() 替换为 notify()

我看到通过将 notifyAll() 更改为 notify() 来替换这些调用会导致发生死锁。但是,替换这些调用的所有其他尝试都失败了。

是否有一些聪明的方法可以用 notify() 替换这些调用,从而使下面的代码适用于单个消费者和任意数量的生产者?

public class Buffer
{
    private volatile String content = "";
    private volatile boolean isEmpty = true;

    public synchronized void addItem(String s)
    {
        while(!isEmpty){
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        content = s;
        isEmpty = false;
        notifyAll();

    }

    public synchronized String getItem()
    {
        while(isEmpty) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        String temp = content;
        isEmpty = true;
        notifyAll();
        return temp;
    }
}

public class Producer implements Runnable
{
    private String greeting;
    private int repetitions;
    private Buffer b;

    public Producer(String aGreeting, int aRepetitions, Buffer aBuffer){
        greeting = aGreeting;
        repetitions = aRepetitions;
        b = aBuffer;
    }

    public void run()
    {
        for(int i = 1; i <= repetitions; i++) {
            b.addItem(greeting + i);
        }
    }
}


public class Consumer implements Runnable {
    private String greeting;
    private Buffer b;
    public Consumer(String aGreeting, Buffer aBuffer){
        greeting = aGreeting;
        b = aBuffer;
    }
    public void run()
    {
        try
        {
            while(true){
                System.out.println(greeting + b.getItem());
                Thread.sleep(100);
            }
        }
        catch(InterruptedException exception){}
    }
}

简而言之:虽然 notifyAll() 通知所有等待线程,notify() 通知任何随机线程,但现在这个随机线程可能不是您下一个需要的线程,这可能会导致死锁。请参考这个例子:

以下步骤使我们陷入僵局。让我们将限制设置为 1 以使示例简短。

E1 enqueues an item.
E2 attempts enqueue - checks wait loop - already full - waits

E3 attempts enqueue - checks wait loop - already full - waits

D1 attempts dequeue - and is executing synchronized block
D2 attempts dequeue - blocks on entry to the (synchronized) block - due to D1

D3 attempts dequeue - blocks on entry to the (synchronized) block - due to D1

D1 is executing enqueue - gets the item, calls notify, exits method
The notify happens to wake up E2 (i.e. "any waiting thread")
BUT, D2 enters sync block before E2 can (E2 must reacquire the lock), so E2 blocks on entry to the enqueue sync block
D2 checks wait loop, no more items in queue, so waits

D3 enters block after D2, but before E2, checks wait loop, no more items in queue, so waits

Now there is E3, D2, and D3 waiting!

Finally E2 acquires the lock, enqueues an item, calls notify, exits method

E2's notification wakes E3 (remember any thread can be woken)
E3 checks the wait loop condition, there is already an item in the queue, so waits.
NO MORE THREADS TO CALL NOTIFY and THREE THREADS PERMANENTLY SUSPENDED!

解决方案:将 notify 替换为 notifyAll

由于参考,notify() instead of notifyAll() for blocking queue

为了能够使用.notify(),您需要保证,任何可能唤醒线程"consume" 整个 "reason" 的通知.

例如,在您的情况下 consumer(方法 .get_itemfrees space for single element in the缓冲。这是消费者通知的原因。因为你使用单一消费者模型,所以只有生产者(方法.add_item)可以因为这个通知而被唤醒。 生产者 使用整个释放的元素将信息存储到其中。

所以,使用 .notify() 是消费者没问题

从另一方面来说,因为你使用了多个生产者,一个生产者的通知可能会唤醒另一个另一个[=47] =]制作人。当然,一个生产者不会消耗另一个生产者的影响。

所以,使用 .notify() 是生产者是不好的

解决问题的最自然方法是使用不同的通知:一种用于消费者,一种用于生产者。因此,生产者中的通知只能唤醒消费者,消费者消费生产者存储的信息。使用 Condition:

可以实现同一临界区下的不同通知
public class Buffer
{
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    // `volatile` isn't needed for objects accessed under critical section
    private String content = "";
    private boolean isEmpty = true;

    // Use lock instead of `synchronized`.
    public void addItem(String s)
    {
        lock.lock();
        try {
            while(!isEmpty){
                try {
                    notFull.await(); // Analogue for wait()
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            content = s;
            isEmpty = false;
            notEmpty.signal(); // Analogue for notify()
        } finally {
            lock.unlock();
        }    
    }

    // Use lock instead of `synchronized`.
    public String getItem()
    {
        lock.lock();
        try {
            while(isEmpty) {
                try {
                    notEmpty.await(); // Analogue for wait()
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            String temp = content;
            isEmpty = true;
            notFull.signal(); // Analogue for notify()
            return temp;
        } finally {
            lock.unlock();
        }    
    }
}