以 producer/consumer 模式暂停消费者

Suspend consumer in producer/consumer pattern

我有生产者和消费者与 BlockingQueue 相关联。

消费者等待队列中的记录并处理它:

Record r = mQueue.take();
process(r);

我需要从其他线程暂停此过程一段时间。如何实现?

现在我想这样实现它,但它看起来像一个糟糕的解决方案:

private Object mLock = new Object();
private boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    mLocked = false;
    mLock.notify();

}

public void run() {
    ....
            Record r = mQueue.take();
            if (mLocked) {
                mLock.wait();
            }
            process(r);
}

您可以使用java.util.concurrent.locks.Condition Java docs 根据相同条件暂停一会儿。

这种方法对我来说看起来很干净,ReentrantLock 机制比同步 具有更好的吞吐量。阅读以下摘自 IBM article

As a bonus, the implementation of ReentrantLock is far more scalable under contention than the current implementation of synchronized. (It is likely that there will be improvements to the contended performance of synchronized in a future version of the JVM.) This means that when many threads are all contending for the same lock, the total throughput is generally going to be better with ReentrantLock than with synchronized.


BlockingQueue以解决生产者-消费者问题而闻名,它也使用Condition等待。

请参阅下面的示例,摘自 Java 文档的 Condition,这是生产者 - 消费者模式的示例实现。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

进一步阅读:

我认为你的解决方案简单而优雅,并且认为你应该保留它并进行一些修改。我建议的修改是 synchronization.

没有它,线程干扰和内存一致性错误可能(并且经常)发生。最重要的是,你不能 waitnotify 在你不拥有的锁上(如果你在 synchronized 块中拥有它,你就拥有它..)。修复很简单,只需在 wait/notify 上添加一个 mLock 同步块。此外,当您从不同的线程更改 mLocked 时,您需要将其标记为 volatile.

private Object mLock = new Object();
private volatile boolean mLocked = false;

public void lock() {
    mLocked = true;
}

public void unlock() {
    synchronized(mlock) {
        mLocked = false;
        mLock.notify();
    }

}

public void run() {
    ....
            Record r = mQueue.take();
            synchronized(mLock) {
                while (mLocked) {
                    mLock.wait();
                }
            }
            process(r);
}

创建一个扩展 BlockingQueue 实现的新 class。添加两个新方法 pause()unpause()。在需要的地方,考虑 paused 标志并使用另一个 blockingQueue2 等待(在我的示例中仅在 take() 方法中,而不是在 put() 中):

public class BlockingQueueWithPause<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 184661285402L;

    private Object lock1 = new Object();//used in pause() and in take()
    private Object lock2 = new Object();//used in pause() and unpause()

    //@GuardedBy("lock")
    private volatile boolean paused;

    private LinkedBlockingQueue<Object> blockingQueue2 = new LinkedBlockingQueue<Object>();

    public void pause() {
        if (!paused) {
            synchronized (lock1) {
            synchronized (lock2) {
                if (!paused) {
                    paused = true;
                    blockingQueue2.removeAll();//make sure it is empty, e.g after successive calls to pause() and unpause() without any consumers it will remain unempty
                }
            }
            }
        }
    }

    public void unpause() throws InterruptedException {
        if (paused) {
            synchronized (lock2) {
                paused = false;
                blockingQueue2.put(new Object());//release waiting thread, if there is one
            }
        }
    }

    @Override
    public E take() throws InterruptedException {
        E result = super.take();

        if (paused) {
            synchronized (lock1) {//this guarantees that a single thread will be in the synchronized block, all other threads will be waiting
                if (paused) {
                    blockingQueue2.take();
                }
            }
        }

        return result;
    }

    //TODO override similarly the poll() method.
}