LMAX Disruptor 作为阻塞队列?

LMAX Disruptor as a blocking queue?

有什么方法可以将两者都放在一个结构中 -

  1. BlockingQueue 的语义,即 - 非阻塞查看、阻塞轮询和阻塞放置。多个提供者一个消费者。
  2. RingBuffer,它有效地用作对象池,所以我不想将新对象放入环形缓冲区,而是想在那里重用现有对象,复制状态。所以基本上 LMAX disruptor 的功能开箱即用。

有没有类似的东西? 我想我可以尝试为此使用 Disruptor,如果我理解正确的话,我已经可以将它用作带有阻塞 put 的阻塞队列(如果环形缓冲区是 "full")。它已经具有我需要的 "reusable objects" 语义。所以唯一的问题是如何创建一个能够 PULL 对象(而不是使用回调)的客户端,因为我不太熟悉内部 Disruptor 结构——可以做到吗?使用所有这些音序器,创建一个新的 EventProcessor 或类似的东西?

不,在客户端有一个阻塞队列并从中获取的明显解决方案并不是一个理想的解决方案,因为它打破了使用中断对象池的全部要点 - 你需要有一个现在新建池,或者在放入阻塞队列之前在回调中创建一个新对象等,我根本不想创建任何垃圾。

那么有没有办法用 Disruptor 或任何其他性能 oriented/garbage 免费 java 库来实现它?

出于好奇,我无法从 Disruptor 本身获得 "blocking pull" 语义,但当然向非阻塞拉动添加 "blocking" 功能是微不足道的。 "Peek" 功能本身是可能的,但效率不高(您需要在每次查看时一次又一次地复制该项目)并且可以通过缓存 "poll".

的结果来替换

所以,最小的原始解决方案,只实现了我需要的方法:

public class DisruptorMPSCQueue<T extends ICopyable<T>> {

    private final RingBuffer<T> ringBuffer;
    private final EventPoller<T> eventPoller;
    private T tempPolledEvent;

    private EventPoller.Handler<T> pollerHandler = new EventPoller.Handler<T>() {
        @Override
        public boolean onEvent(final T event, final long sequence, final boolean endOfBatch) throws Exception {
            tempPolledEvent.copyFrom(event);
            return false;
        }
    };

    public DisruptorMPSCQueue(EventFactory<T> typeConstructor, int size) {
        ringBuffer = RingBuffer.createMultiProducer(typeConstructor, size);
        eventPoller = ringBuffer.newPoller();
        ringBuffer.addGatingSequences(eventPoller.getSequence());
    }

    /**
     * Blocking, can be called from any thread, the event will be copied to the ringBuffer
     */
    public void put(final T event) {
        long sequence = ringBuffer.next(); // blocked by ringBuffer's gatingSequence
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Not blocking, can be called from any thread, the event will be copied to the ringBuffer
     *
     * @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
     */
    public void offer(final T event) {
        long sequence;
        try {
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            throw new IllegalStateException(e); // to mimic blockingQueue
        }
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Retrieve top of the queue(removes from the queue). NOT thread-safe, can be called from one thread only.
     *
     * @param destination top of the queue will be copied to destination
     * @return destination object or null if the queue is empty
     */
    public T poll(final T destination) {
        try {
            tempPolledEvent = destination;  // yea, the poller usage is a bit dumb
            EventPoller.PollState poll = eventPoller.poll(pollerHandler);
            if (poll == EventPoller.PollState.PROCESSING) {
                return tempPolledEvent;
            } else {
                return null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

我们在今年早些时候开源了 Conversant Diruptor,其中包括 DiruptorBlockingQueue。您可以在 github

上找到代码

Conversant Disruptor 几乎可以轻松包含在任何项目中,因为它支持 BlockingQueue api 并且已在 Maven Central 上发布。