如何控制多个生产者同时访问多个共享队列?

How to control simultaneous access to multiple shared queues by multiple producers?

一种方法是锁定然后检查第一个共享队列的状态,如果 space 可用则推送数据,如果不可用则忽略,然后解锁。

然后检查第二个共享队列的状态,如果space可用则推送数据,否则忽略,然后解锁。

如此等等。

在这里,我们将不断地锁定和解锁以查看共享队列的状态,然后采取相应的行动。


问题:

这种方法有什么缺点?当然时间会花在加锁和解锁上。是吗?

没有当前方法的缺点,还有哪些其他方法可以达到相同的效果?

真的,你在这里赚的太多了 locks/unlocks。解决办法是做同样的检查两次:

check if space is available, if not, continue
lock
check if space is available AGAIN 
... go on as you did before. 

如果您只在极少数情况下不需要这样做,您将通过这种方式进行锁定。

第一次在书上看到解法"Professional Java EE Design Patterns"(叶儿,Theedom)

编辑。

关于在线程之间分配开始队列号。

注意,在没有任何特殊组织的情况下,这些线程只是第一次等待队列。下一次必要的时移将通过简单的等待来创建。当然,我们可以自己创建时移,在线程之间传播起始编号。并且通过等量转移的简单传播将比随机传播更有效。

锁争用非常昂贵,因为它需要上下文切换 - 请参阅 the LMAX Disruptor for a more in-depth explanation, in particular the performance results page; the Disruptor is a lock-free 有界队列比使用锁的有界队列表现出更少的延迟。

减少锁争用的一种方法是让您的生产者以彼此不同的顺序检查队列,例如,而不是每个生产者检查 Queue1,然后是 Queue2,...最后是 QueueN,每个生产者会重复在 [1, N] 之间生成一个随机数,然后检查 Queue[Rand(N)]。一个更复杂的解决方案是维护一组根据可用 space 排序的队列(例如,在 Java 中,这将是 ConcurrentSkipListSet),然后让每个生产者从队列中删除队列集合的头部(即具有最多可用 space 且未被另一个生产者同时访问的队列),添加一个元素,并将队列插入回集合中;同样,一个更简单的解决方案是维护一个无限制的未排序队列,并让生产者从队列的头部移除并检查队列,然后将队列插入回队列的尾部,这将确保只有一个生产者能够在任何给定时间点检查队列。

另一种解决方案是减少并理想地消除锁的数量 - 编写无锁算法很困难,但它也可能非常有益,正如 LMAX 的无锁队列的性能所证明的那样。代替用 LMAX 的无锁有界队列替换你的锁定有界队列,另一种解决方案是用无锁无界队列替换你的锁定有界队列(例如 Java 的 ConcurrentLinkedQueue; lock-free unbounded queues are much more likely to be in your language's standard library than lock-free bounded queues) and to place conservative lock-free guards on these queues. For example, using Java's AtomicInteger 用于守卫:

public class BoundedQueue<T> {
    private ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private AtomicInteger bound = new AtomicInteger(0);
    private final int maxSize;

    public BoundedQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    public T poll() {
        T retVal = queue.poll();
        if(retVal != null) {
            bound.decrementAndGet();
        }
        return retVal;
    }

    public boolean offer(T t) {
        if(t == null) throw new NullPointerException();
        int boundSize = bound.get();
        for(int retryCount = 0; retryCount < 3 && boundSize < maxSize; retryCount++) {
            if(bound.compareAndSet(boundSize, boundSize + 1)) {
                return queue.offer(t);
            }
            boundSize = bound.get();
        }
        return false;            
    }
}

poll() 将 return 来自队列头部的元素,如果头部元素不为空,即如果队列不为空,则递减 boundoffer(T t) 尝试在不超过 maxSize 的情况下增加 bound 的大小,如果成功则将元素放在队列的尾部,否则如果失败三次则方法 return是错误的。这是一个保守的守卫,因为即使队列未满,offer 也有可能失败,例如如果在 boundSize = bound.get()boundSize 设置为 maxSize 之后删除了一个元素,或者由于多个消费者调用 poll()bound.compareAndSet(expected, newVal) 方法恰好失败了三次。 =26=]