LinkedBlockingQueue 中的 addAll() 是线程安全的(如果不是,则为解决方案)?

addAll() in LinkedBlockingQueue is thread safe (and solution if it's not)?

引用文档:

"BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c."

由于在 LinkedBlockingQueue.addAll() 操作的描述中没有写任何特别的东西,我不得不假设这不是线程安全的。

你是否同意我的看法,为了保证通过 addAll() 添加的所有元素都是连续的(即添加在一起)的唯一解决方案是每次修改队列时使用 Lock (使用 addtake 操作)?例如:

BlockingQueue<T> queue = new LinkedBlockingQueue<>();
Lock lock = new ReentrantLock();

//somewhere, some thread...
lock.lock();
queue.addAll(someCollection);
lock.unlock();

//somewhere else, (maybe) some other thread...
lock.lock();
queue.take();
lock.unlock();

重要更新: 哇,没有人在前面的例子中看到一个大错误:因为 take() 是一个阻塞操作,并且因为需要锁才能将元素添加到队列中,所以一旦队列为空,程序就会进入处于死锁状态:写入器无法写入,因为 take() 拥有锁,同时 take() 将处于阻塞状态,直到队列中写入某些内容(它可以' 由于先前的原因而发生)。 有什么想法吗? 我认为最明显的方法是移除 take() 周围的锁,但可能无法保证 addAll() 所需的原子性。

我相信您混淆了 线程安全原子。据我了解,批量操作是线程安全的,但不是原子的。

我认为您不需要使用外部 ReentrantLock 来使您的 BlockingQueue 是线程安全的。实际上,addAll() 是通过遍历给定的 Collection 并在队列中为集合中的每个元素调用 add() 来实现的。由于 add() 是线程安全的,因此您不需要同步任何东西。

当 javadocs 这么说时:

So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.

这意味着当 addAll(c) returns 时可能只添加了给定集合 c 的一些元素。但是,这并不意味着您需要锁定队列才能调用 take() 或任何其他操作。

编辑:

根据您的用例,您可以按照您的建议使用锁,但我会将其置于 BlockingQueue 实现的内部,这样调用者 类 就不需要遵守lock/call_some_method_from_the_queue/unlock 模式。使用锁时我会更加小心,即在 try/finally 块中使用它:

public class MyQueue<T> extends LinkedBlockingQueue<T> {

    private final Lock lock = new ReentrantLock();

    @Override
    public boolean addAll(Collection<T> c) {
        boolean r = false;
        try {
            this.lock.lock();
            r = super.addAll(c);
        } finally {
            this.lock.unlock();
        }
        return r;
    }

    @Override
    public void add(T e) {
        try {
            this.lock.lock();
            super.add(e);
        } finally {
            this.lock.unlock();
        }
    }

    // You don't need to lock on take(), since
    // it preserves the order in which elements
    // are inserted and is already thread-safe
}

addAll 仍然是线程安全的,只是不提供原子性。 所以这取决于您的用例/期望。

如果您在没有显式锁定的情况下使用 addAll,那么如果其他线程尝试写入队列(添加新元素),则无法保证添加元素的顺序,并且它们可能会混合。如果这是一个问题而不是你需要锁定。但是 addAll 仍然是线程安全的,不会有队列的损坏。

但通常情况下,队列用于提供许多 Readers/Writers 之间的通信方式,可能不需要严格保留插入顺序。

现在,主要问题是,如果队列已满,add 方法会抛出异常,因此 addAll 操作可能会在中间崩溃,您不知道哪些元素被添加了哪些没有。

如果您的用例允许等待 space 插入元素,那么您应该在循环中使用 put。

for (E e: someCollection) queue.put(e);

这将阻塞直到有 space 添加另一个元素。

手动加锁比较麻烦,因为访问队列时总要记得加锁,容易出错。因此,如果您确实需要原子性,请编写一个实现 BlockingQUeue 接口但在调用底层操作之前使用锁定的包装器 class。