高竞争的消费者-生产者

Consumer-Producer with high contention

我想知道实现多个生产者 - 单个消费者场景的最佳机制是什么,在这种情况下我必须保持当前未处理请求的数量是最新的。

我的第一个想法是使用 ConcurrentLinkedQueue:

public class SomeQueueAbstraction {

    private Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
    private int size;

    public void add(Object request) {
        SomeObject object = convertIncomingRequest(request);   
        concurrentQueue.add(object);
        size++;

    }

    public SomeObject getHead() {
        SomeObject object = concurrentQueue.poll();
        size--;
    }

    // other methods

问题在于我必须在 addsize ++ 以及 pollsize-- 上显式同步,以便始终准确 size 这使得 ConccurentLinkedQueue 一开始就毫无意义。

在保持数据一致性的同时实现尽可能好的性能的最佳方法是什么?

我应该改用 ArrayDequeue 并显式同步还是有更好的方法来实现此目的?

这里有点类似 question/answer:

java.util.ConcurrentLinkedQueue

讨论了 ConcurrentLinkedQueue 上的复合操作如何自然地不是原子的,但没有直接的答案对于给定的场景什么是最佳选择。

注意:我正在明确计算大小,因为固有 .size() 方法的时间复杂度为 O(n)。

注意 2:我还担心我没有明确编写的 getSize() 方法会增加更多的争用开销。可以调用的比较频繁

我正在寻找处理多个生产者的最有效方法 - 频繁调用 getSize() 的单个消费者。

替代建议:如果 SomeObject 结构中有 elementId,我可以从 ConcurrentLinkedQueue.poll() 获取当前大小,并且只需要在生成此类 id 的机制中进行锁定。 Add 和 get 现在可以在没有额外锁定的情况下正确使用。作为替代方案,这种票价如何?

您可以使用显式锁,这意味着您可能不需要并发队列。

public class SomeQueueAbstraction {

    private Queue<SomeObject> queue = new LinkedList<>();
    private volatile int size;
    private Object lock = new Object();

    public void add(Object request) {
        SomeObject object = convertIncomingRequest(request); 
        synchronized(lock) {  
            queue.add(object);
            size++;
        }
    }

    public SomeObject getHead() {
        SomeObject object = null;
        synchronized(lock) {
            object = queue.poll();
            size--;
        }
        return object;
    }

    public int getSize() {
        synchronized(lock) {
            return size;
        }
    }

    // other methods
}

这样,adding/removing 个元素 to/from 队列和更新 size 将安全完成。

所以要求是报告最新的当前未处理请求数。而且这经常被要求,这确实使 ConcurrentLinkedQueue.size() 不合适。

这可以使用 AtomicInteger 来完成:速度很快,并且总是尽可能接近当前未处理请求的数量。

这里是一个例子,注意一些小的更新以确保报告的大小是准确的:

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class SomeQueueAbstraction {

    private final Queue<SomeObject> concurrentQueue = new ConcurrentLinkedQueue<>();
    private final AtomicInteger size = new AtomicInteger();

    public boolean add(Object request) {

        SomeObject object = convertIncomingRequest(request);   
        if (concurrentQueue.add(object)) {
            size.incrementAndGet();
            return true;
        }
        return false;
    }

    public SomeObject remove() {

        SomeObject object = concurrentQueue.poll();
        if (object != null) {
            size.decrementAndGet();
        }
        return object;
    }

    public int getSize() { return size.get(); }

    private SomeObject convertIncomingRequest(Object request) { 
        return new SomeObject(getSize()); 
    }

    class SomeObject {
        int id;
        SomeObject(int id) { this.id = id; }
    }
}