为什么 disruptor 使用较小的环形缓冲区会变慢?

Why is disruptor slower with smaller ring buffer?

Disruptor Getting Started Guide 之后,我构建了一个具有单一生产者和单一消费者的最小破坏者。

制作人

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData()
    {
        long sequence = ringBuffer.next();
        try
        {
            LongEvent event = ringBuffer.get(sequence);
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

消费者(注意消费者什么都不做onEvent

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {}
}

我的目标是性能测试绕过一个大的环形缓冲区一次与多次遍历一个较小的环形缓冲区。在每种情况下,总操作数 (bufferSize X rotations) 都是相同的。我发现随着环形缓冲区变小,ops/sec 速率急剧下降。

RingBuffer Size |  Revolutions  | Total Ops   |   Mops/sec

    1048576     |      1        |  1048576    |     50-60

       1024     |      1024     |  1048576    |     8-16

        64      |      16384    |  1048576    |    0.5-0.7

        8       |      131072   |  1048576    |    0.12-0.14

问题: 当环形缓冲区大小减小但总迭代次数固定时,性能大幅下降的原因是什么? 这种趋势独立于 WaitStrategySingle vs MultiProducer - 吞吐量减少,但趋势相同。

Main(注意SingleProducerBusySpinWaitStrategy

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class LongEventMainJava{
        static double ONEMILLION = 1000000.0;
        static double ONEBILLION = 1000000000.0;

    public static void main(String[] args) throws Exception {
            // Executor that will be used to construct new threads for consumers
            Executor executor = Executors.newCachedThreadPool();    

            // TUNABLE PARAMS
            int ringBufferSize = 1048576; // 1024, 64, 8
            int rotations = 1; // 1024, 16384, 131702

            // Construct the Disruptor
            Disruptor disruptor = new Disruptor<>(new LongEventFactory(), ringBufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());

            // Connect the handler
            disruptor.handleEventsWith(new LongEventHandler());

            // Start the Disruptor, starts all threads running
            disruptor.start();

            // Get the ring buffer from the Disruptor to be used for publishing.
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            LongEventProducer producer = new LongEventProducer(ringBuffer);

            long start = System.nanoTime();
            long totalIterations = rotations * ringBufferSize;
            for (long i = 0; i < totalIterations; i++) {
                producer.onData();
            }
            double duration = (System.nanoTime()-start)/ONEBILLION;
            System.out.println(String.format("Buffersize: %s, rotations: %s, total iterations = %s, duration: %.2f seconds, rate: %.2f Mops/s",
                    ringBufferSize, rotations, totalIterations, duration, totalIterations/(ONEMILLION * duration)));
        }
}

为了 运行,您将需要简单的 Factory 代码

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

运行 核心 i5-2400,12GB 内存,windows 7

示例输出

Buffersize: 1048576, rotations: 1, total iterations = 1048576, duration: 0.02 seconds, rate: 59.03 Mops/s

Buffersize: 64, rotations: 16384, total iterations = 1048576, duration: 2.01 seconds, rate: 0.52 Mops/s

当生产者填满环形缓冲区时,它必须等到事件被消耗才能继续。

当您的缓冲区刚好等于您要放入的元素数量时,生产者永远不必等待。它永远不会溢出。它所做的只是增加一个计数、索引,并在该索引处的环形缓冲区中发布数据。

当您的缓冲区较小时,它仍然只是递增计数并发布,但它的执行速度快于消费者可以使用的速度。因此,生产者必须等到元素被消耗并且环形缓冲区上的 space 被释放。

问题似乎出在 lmax\disruptor\SingleProducerSequencer

中的这段代码中
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

特别是对 LockSupport.parkNanos(1L) 的调用。这最多可能需要 15ms on Windows。当生产者到达缓冲区的末尾并等待消费者时,将调用它。

其次,当buffer较小时,容易出现RingBuffer的false sharing。我猜想这两种影响都在起作用。

最后,我能够在基准测试之前通过对 onData() 的一百万次调用使用 JIT 来加速代码。这得到了最好的情况> 80Mops/sec,但没有消除缓冲区收缩带来的退化。