LinkedBlockingQueue put 和 take 功能实现
LinkedBlockingQueue put and take functional implementation
我正在查看 LinkedBlockingQueue 的内部实现
put(E e) 和 take() 函数。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
**notEmpty.signal();**
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
**notFull.signal();**
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
在两种方法中都没有得到为什么在与容量比较后调用 signal() on condition。
如果有人能解释一下,将不胜感激。
这是我可以考虑使用的一种情况:
if (c > 1)
notEmpty.signal();
假设队列为空,有3个线程,thread_1, thread_2, thread_3.
- thread_1 调用
take()
,在 notEmpty.await()
被阻止。
- thread_2 调用
take()
,在 notEmpty.await()
被阻止。
- thread_3 调用
take()
,在 notEmpty.await()
被阻止。
然后,还有其他3个线程,thread_4,thread_5,thread_6。
- thread_4调用
put()
,在队列中添加一个元素,signal
thread_1.
- thread_1醒醒,重新获取
takeLock
,尝试取第一个元素
- 同时,thread_5调用
put()
,在队列中添加另一个元素,signal
thread_2.
- thread_2醒来,尝试重新获取
takeLock
,但是thread_1现在正在持有锁,所以它必须等待
- thread_6 中断 thread_2,thread_2 抛出
InterruptedException
并终止。
- thread_1 取第一个元素。但是,这个队列中还有另一个元素,所以上面代码中的
c > 1
出现了。如果thread_1不调用signal
,thread_3不能唤醒取第二个元素
- thread_1调用
signal
,thread_3唤醒,取第二个元素
我正在查看 LinkedBlockingQueue 的内部实现 put(E e) 和 take() 函数。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
**notEmpty.signal();**
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
**notFull.signal();**
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
在两种方法中都没有得到为什么在与容量比较后调用 signal() on condition。 如果有人能解释一下,将不胜感激。
这是我可以考虑使用的一种情况:
if (c > 1)
notEmpty.signal();
假设队列为空,有3个线程,thread_1, thread_2, thread_3.
- thread_1 调用
take()
,在notEmpty.await()
被阻止。 - thread_2 调用
take()
,在notEmpty.await()
被阻止。 - thread_3 调用
take()
,在notEmpty.await()
被阻止。
然后,还有其他3个线程,thread_4,thread_5,thread_6。
- thread_4调用
put()
,在队列中添加一个元素,signal
thread_1. - thread_1醒醒,重新获取
takeLock
,尝试取第一个元素 - 同时,thread_5调用
put()
,在队列中添加另一个元素,signal
thread_2. - thread_2醒来,尝试重新获取
takeLock
,但是thread_1现在正在持有锁,所以它必须等待 - thread_6 中断 thread_2,thread_2 抛出
InterruptedException
并终止。 - thread_1 取第一个元素。但是,这个队列中还有另一个元素,所以上面代码中的
c > 1
出现了。如果thread_1不调用signal
,thread_3不能唤醒取第二个元素 - thread_1调用
signal
,thread_3唤醒,取第二个元素