为什么我的 Disruptor 程序没有充分利用环形缓冲区
Why my Disruptor program don't take full advantage of the ringbuffer
Disruptor github 地址是:https://github.com/LMAX-Exchange/disruptor
我有一个简单的测试如下:
public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000 * sequence);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Element: " + element.get());
}
};
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
int bufferSize = 4;
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
disruptor.handleEventsWith(handler);
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; l < 8; l++) {
long sequence = ringBuffer.next();
System.out.println("sequence:" + sequence);
try {
Element event = ringBuffer.get(sequence);
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
}
}
}
结果是:
sequence:0
sequence:1
sequence:2
sequence:3
元素:0
元素:1
元素:2
元素:3
sequence:4
sequence:5
sequence:6
sequence:7
元素:4
元素:5
元素:6
元素:7
在我的测试中,我定义了一个大小为 4 的环形缓冲区,我有一个生产者为其创建 8 个任务,我的问题是,当生产者将 4 个任务放入环形缓冲区时,消费者开始从 ringbuffer 中取出任务,任务 1 完成后,ringbuffer 应该有一个空的 space 用于任务 5,但是结果表明,只有当 ringbuffer 中的所有任务都完成时,ringbuffer可以接受新任务,为什么?
这是因为 Disruptor 将批处理事件处理程序。如果事件处理程序很慢或环形缓冲区很小,则批大小通常可以是环形缓冲区的大小。 Disruptor 只会更新该事件处理程序的处理序列,直到批处理完成。这减少了它需要对发布者用来确定 space 是否可用的序列变量进行更新的次数。如果您需要使 space 早于默认值可用,那么您可以使用 SequenceReportingEventHandler 来实现。
public class MyEventHandler implements SequenceReportingEventHandler<Element> {
Sequence processedSequence;
public void setSequenceCallback(Sequence s) {
processedSequence = s;
}
public void onEvent(Element e, long sequence, boolean endOfBatch) {
// Do stuff
processedSequence.set(sequence);
}
}
Disruptor github 地址是:https://github.com/LMAX-Exchange/disruptor
我有一个简单的测试如下:
public class DisruptorMain {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
class Element {
private int value;
public int get() {
return value;
}
public void set(int value) {
this.value = value;
}
}
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "simpleThread");
}
};
EventFactory<Element> factory = new EventFactory<Element>() {
@Override
public Element newInstance() {
return new Element();
}
};
EventHandler<Element> handler = new EventHandler<Element>() {
@Override
public void onEvent(Element element, long sequence, boolean endOfBatch) {
try {
Thread.sleep(1000 * sequence);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Element: " + element.get());
}
};
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
int bufferSize = 4;
Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);
disruptor.handleEventsWith(handler);
disruptor.start();
RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();
for (int l = 0; l < 8; l++) {
long sequence = ringBuffer.next();
System.out.println("sequence:" + sequence);
try {
Element event = ringBuffer.get(sequence);
event.set(l);
} finally {
ringBuffer.publish(sequence);
}
}
}
}
结果是: sequence:0 sequence:1 sequence:2 sequence:3 元素:0 元素:1 元素:2 元素:3 sequence:4 sequence:5 sequence:6 sequence:7 元素:4 元素:5 元素:6 元素:7
在我的测试中,我定义了一个大小为 4 的环形缓冲区,我有一个生产者为其创建 8 个任务,我的问题是,当生产者将 4 个任务放入环形缓冲区时,消费者开始从 ringbuffer 中取出任务,任务 1 完成后,ringbuffer 应该有一个空的 space 用于任务 5,但是结果表明,只有当 ringbuffer 中的所有任务都完成时,ringbuffer可以接受新任务,为什么?
这是因为 Disruptor 将批处理事件处理程序。如果事件处理程序很慢或环形缓冲区很小,则批大小通常可以是环形缓冲区的大小。 Disruptor 只会更新该事件处理程序的处理序列,直到批处理完成。这减少了它需要对发布者用来确定 space 是否可用的序列变量进行更新的次数。如果您需要使 space 早于默认值可用,那么您可以使用 SequenceReportingEventHandler 来实现。
public class MyEventHandler implements SequenceReportingEventHandler<Element> {
Sequence processedSequence;
public void setSequenceCallback(Sequence s) {
processedSequence = s;
}
public void onEvent(Element e, long sequence, boolean endOfBatch) {
// Do stuff
processedSequence.set(sequence);
}
}