LMAX Disruptor - 什么决定了批量大小?
LMAX Disruptor - what determines the batch size?
我最近一直在学习 LMAX Disruptor 并进行了一些实验。令我困惑的一件事是 EventHandler
的 onEvent
处理程序方法的 endOfBatch
参数。考虑我的以下代码。首先,我调用 Test1
和 Test1Worker
:
的虚拟消息和消费者 classes
public class Test1 {
}
public class Test1Worker implements EventHandler<Test1>{
public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
try{
Thread.sleep(500);
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("Received message with sequence " + sequence + ". "
+ "EndOfBatch = " + endOfBatch);
}
}
请注意,我已经设置了 500 毫秒的延迟,只是为了替代一些真实世界的工作。我还在控制台中打印
的序列号
然后我的 driver class(作为制作人)调用 DisruptorTest
:
public class DisruptorTest {
private static Disruptor<Test1> bus1;
private static ExecutorService test1Workers;
public static void main(String[] args){
test1Workers = Executors.newFixedThreadPool(1);
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
bus1.handleEventsWith(new Test1Worker());
RingBuffer<Test1> buf1 = bus1.start();
for (int i = 0; i < 10; i++){
long a = System.currentTimeMillis();
long next = buf1.next();
long b = System.currentTimeMillis();
System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
try {
Test1 message = buf1.get(next);
} catch (Exception e) {
e.printStackTrace();
} finally {
buf1.publish(next);
}
}
}
public static class Test1Factory implements EventFactory<Test1> {
public Test1 newInstance() {
return new Test1();
}
}
}
在这里,初始化所需的东西后,我将 10 条消息提供给 RingBuffer
(缓冲区大小 8)并尝试监视一些事情 - 生产者在中声明下一个插槽的延迟RingBuffer
和消息及其在消费者端的序列号,以及特定序列是否被视为批处理结束。
现在,有趣的是处理每条消息涉及 500 毫秒的延迟,这就是我得到的输出:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true
但是,如果我删除 500 毫秒的等待时间,这就是我得到的:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true
所以看起来某个消息是否被认为是批处理的末尾(即,批处理的大小)受到消费者消息处理延迟的影响。可能是我在这里很愚蠢,但事情应该是这样吗?这背后的原因是什么?无论如何,通常是什么决定了批量大小?提前致谢。如果我的问题中有任何不清楚的地方,请告诉我。
批量大小仅由可用元素的数量决定。因此,如果此时有更多元素可用,那么它将包含在批次中。例如,如果 Disruptor 调用您的代码并且队列中只有一个元素,那么您将收到一个带有 endOfBatch=true 的调用。如果队列中有 8 个元素,那么它将收集所有 8 个元素并在一个批次中发送它们。
您可以在下面的代码中看到,队列中的条目数 "available" 已被提取,并且可能比 "next" 项多得多。所以比如你现在是5,在等slot 6,然后3个event到了,available会是8个,你会批量接到多个call(for 6,7,8)。
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
关于元素 9 处的 500 毫秒暂停,请注意 Disruptor 是使用环形缓冲区构建的,并且您已将缓冲区中的槽数指定为 8(请参阅此处的第二个参数):
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
如果不是所有消费者都消费了一个元素,并且环形缓冲区已满(所有 8 个元素都已满),生产者将被阻止向缓冲区发布新事件。您可以尝试增加缓冲区大小,比如 200 万个对象,或者确保您的消费者比生产者更快,这样队列就不会填满(删除睡眠,您已经证明了这一点)。
我最近一直在学习 LMAX Disruptor 并进行了一些实验。令我困惑的一件事是 EventHandler
的 onEvent
处理程序方法的 endOfBatch
参数。考虑我的以下代码。首先,我调用 Test1
和 Test1Worker
:
public class Test1 {
}
public class Test1Worker implements EventHandler<Test1>{
public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
try{
Thread.sleep(500);
}
catch(Exception e){
e.printStackTrace();
}
System.out.println("Received message with sequence " + sequence + ". "
+ "EndOfBatch = " + endOfBatch);
}
}
请注意,我已经设置了 500 毫秒的延迟,只是为了替代一些真实世界的工作。我还在控制台中打印
的序列号然后我的 driver class(作为制作人)调用 DisruptorTest
:
public class DisruptorTest {
private static Disruptor<Test1> bus1;
private static ExecutorService test1Workers;
public static void main(String[] args){
test1Workers = Executors.newFixedThreadPool(1);
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
bus1.handleEventsWith(new Test1Worker());
RingBuffer<Test1> buf1 = bus1.start();
for (int i = 0; i < 10; i++){
long a = System.currentTimeMillis();
long next = buf1.next();
long b = System.currentTimeMillis();
System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
try {
Test1 message = buf1.get(next);
} catch (Exception e) {
e.printStackTrace();
} finally {
buf1.publish(next);
}
}
}
public static class Test1Factory implements EventFactory<Test1> {
public Test1 newInstance() {
return new Test1();
}
}
}
在这里,初始化所需的东西后,我将 10 条消息提供给 RingBuffer
(缓冲区大小 8)并尝试监视一些事情 - 生产者在中声明下一个插槽的延迟RingBuffer
和消息及其在消费者端的序列号,以及特定序列是否被视为批处理结束。
现在,有趣的是处理每条消息涉及 500 毫秒的延迟,这就是我得到的输出:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true
但是,如果我删除 500 毫秒的等待时间,这就是我得到的:
Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true
所以看起来某个消息是否被认为是批处理的末尾(即,批处理的大小)受到消费者消息处理延迟的影响。可能是我在这里很愚蠢,但事情应该是这样吗?这背后的原因是什么?无论如何,通常是什么决定了批量大小?提前致谢。如果我的问题中有任何不清楚的地方,请告诉我。
批量大小仅由可用元素的数量决定。因此,如果此时有更多元素可用,那么它将包含在批次中。例如,如果 Disruptor 调用您的代码并且队列中只有一个元素,那么您将收到一个带有 endOfBatch=true 的调用。如果队列中有 8 个元素,那么它将收集所有 8 个元素并在一个批次中发送它们。
您可以在下面的代码中看到,队列中的条目数 "available" 已被提取,并且可能比 "next" 项多得多。所以比如你现在是5,在等slot 6,然后3个event到了,available会是8个,你会批量接到多个call(for 6,7,8)。
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
关于元素 9 处的 500 毫秒暂停,请注意 Disruptor 是使用环形缓冲区构建的,并且您已将缓冲区中的槽数指定为 8(请参阅此处的第二个参数):
bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);
如果不是所有消费者都消费了一个元素,并且环形缓冲区已满(所有 8 个元素都已满),生产者将被阻止向缓冲区发布新事件。您可以尝试增加缓冲区大小,比如 200 万个对象,或者确保您的消费者比生产者更快,这样队列就不会填满(删除睡眠,您已经证明了这一点)。