如何实现 PriorityBlockingQueue 的循环顺序?

How to implement round-robin order for PriorityBlockingQueue?

我有一个PriorityBlockingQueue。单个线程一次从该队列中消费一条消息并对其进行处理。其他几个线程正在将消息插入队列。生产者线程为他们提交的每条消息分配一个完整的优先级。静态 AtomicLong 用于为每条消息分配一个唯一的、单调递增的 ID。队列的 Comparator 首先按此优先级对消息进行排序,然后按 ID 对同等优先级的消息进行排序(最低 ID 在前)。

问题:有时一个生产者会提交大量消息。这会让其他生产者无法处理他们的消息。我想做的是让生产者之间的消费者轮询以获取同等优先级的消息(同时仍按提交顺序处理单个生产者的同等优先级消息)。但是我不知道如何编写 Comparator 来做到这一点。

我考虑的另一种选择是为每个生产者设置一个单独的队列。但是,我认为这行不通,因为我不知道单线程在多个队列上等待的任何方式。

我觉得为每个制作人设置一个 Queue 更直接。一个线程不能等待多个 Queue,但您可以将所有 Queue 合并到一个助手 class 中,这样它就不需要了。

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.annotation.concurrent.GuardedBy;

public class RoundRobin<P, E> {
    private final Lock lock = new ReentrantLock();
    private final Condition added = lock.newCondition();

    @GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();

    public boolean add(P producer, E item) {
        lock.lock();
        try {
            if (!queues.containsKey(producer)) {
                queues.put(producer, new PriorityBlockingQueue<>());
            }

            added.signalAll();
            return queues.get(producer).add(item);
        } finally {
            lock.unlock();
        }
    }

    public Iterator<E> roundRobinIterator() {
        return new Iterator<E>() {
            private Iterator<? extends Queue<E>> i = null;
            private boolean singlePass = true;

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public E next() {
                lock.lock();
                try {
                    while (true) {
                        if (i == null || !i.hasNext()) {
                            i = queues.values().iterator();
                            singlePass = true;
                        }

                        while (i.hasNext()) {
                            Queue<E> q = i.next();
                            if (!q.isEmpty()) {
                                if (singlePass) {
                                    // copy the iterator to prevent
                                    // ConcurrentModificationExceptions
                                    singlePass = false;
                                    i = copy(i);
                                }
                                return q.poll();
                            }
                        }

                        if (singlePass) {
                            // If singlePass is true then we just checked every
                            // queue and they were all empty.
                            // Wait for another element to be added.
                            added.await();
                        }
                    }
                } catch (InterruptedException e) {
                    throw new NoSuchElementException(e.getMessage());
                } finally {
                    lock.unlock();
                }
            }

            private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
                List<T> copy = new ArrayList<>();
                while (i.hasNext()) {
                    copy.add(i.next());
                }
                return copy.iterator();
            }
        };
    }
}

一切都取决于您如何分配 ID。将它们 N 分开分配,其中 N 是生产者的数量,并将每个生产者的索引添加到其中。然后依次读取它们将产生循环顺序。您需要做一些簿记工作来了解何时增加基础 ID,这将在您达到 Nx-1 时发生,对于任何 x.

我想我会做这样的事情:

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class RRQueue<M> {
    private final ThreadLocal<Queue<M>> threadQueue = new ThreadLocal<>();
    private final List<Queue<M>> queues;
    private int current = 0;

    public RRQueue() {
        this.queues = new ArrayList<>();
    }

    public synchronized void add(M msg) {
        Queue<M> queue = threadQueue.get();
        if (queue == null) {
            queue = new LinkedList<>(); // or whatever
            queues.add(queue);
            threadQueue.set(queue);
        }
        queue.add(msg);
        notify();
    }

    public synchronized M get() throws InterruptedException {
        while (true) {
            for (int i = 0; i < queues.size(); ++i) {
                Queue<M> queue = queues.get(current);
                current = (current+1)%queues.size();
                if (!queue.isEmpty()) {
                    return queue.remove();
                }
            }
            wait();
        }
    }
}

可以使用 PriorityBlockingQueue 实现 N 个生产者之间的循环,如下所示。

为每个生产者维护 N 个 AtomicInteger 计数器,并在生产者计数器相等的情况下维护一个全局计数器作为决胜局。

当添加到生产者的 Q 增强计数器以及全局计数器并存储到 Q 对象中时。 Q 的比较器将根据生产者计数器 1st 进行排序,然后是存储在 Q 对象中的全局计数器值。

然而,当一个生产者的对象在 Q 中有一段时间为空时,相应的计数器会落后,并且当对象开始进入时它将开始占用 Q。

为避免这种情况,维护一个易失性变量,该变量在 de-queued 时随对象的生产者计数器更新。在 en-queue 期间增加生产者计数器后,如果该值小于 volatile 变量中的最后一个 de-queued 计数器,则将计数器重置为该值 + 1。