在 LMAX Disruptor 中的后续消费者之间传递数据(从 unmarchaler 到业务逻辑)?
Passing data between subsequental Consumers in LMAX Disruptor (from unmarchaler to business logic)?
如 https://martinfowler.com/articles/lmax.html, I would need to process my RingBuffer's events first with Unmarchaler and then with Business Logic Processor. Suppose it is configured like (https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/dsl/Disruptor.html)
中所述
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() { ... };
EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() { ... };
disruptor.handleEventsWith(handler1);
disruptor.after(handler1).handleEventsWith(handler2);
当时的想法是 handler1 是 unmarchaler,handler2 使用 handler1 处理的内容。
问题:如何准确地编写 "unmarchaling and putting back to disruptor" 部分的代码?我找到了这个 https://groups.google.com/forum/#!topic/lmax-disruptor/q6h5HBEBRUk 解释,但我不太明白。假设事件到达 handler1
的回调
void onEvent(T event, long sequence, boolean endOfBatch)
(javadoc: https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/EventHandler.html)
从事件中解封一些数据。现在我需要将未编组数据附加到 handler2 的事件中,它将处理未编组对象。
"update"事件需要做什么?修改 "event" 对象就足够了吗?
这的影响实际上取决于您的特定场景,并且一如既往,如果您追求低延迟,则应该同时尝试和基准测试。
最直接的方法是更新 'event' 对象,但是根据您的特定方法,这可能会错过破坏者的很多 single-writer 好处。我会解释并提供一些选择。
假设您有 handler1 和 handler2,handler1 在线程 1 中是 运行,handler2 在线程 2 中是 运行。初始事件发布者在线程 0 上。
- Thread0 将一个条目写入插槽 1 的缓冲区
- Thread1 读取槽 1 中的条目并写入槽 1
- Thread0 将一个条目写入插槽 2 的缓冲区
- 线程 2 从插槽 1 读取并写入输出
- Thread1 读取槽 2 中的条目并写入槽 2
- Thread2 从插槽 2 读取并写入输出
如果你想到物理内存布局,slot1和slot2在内存中最好是相邻的。例如,它们可以是字节数组的某个子集。正如您所看到的,您正在从不同的线程(可能是不同的 cpu 核心)交替读取和写入非常相邻的内存块,这可能导致错误的共享/缓存行跳来跳去。最重要的是,你通过内存的读写不太可能是线性的,所以你会错过 CPU 缓存的一些好处。
其他一些可能更好的选项:
有单独的环形缓冲区,其中第一个环形缓冲区是原始数据,第二个环形缓冲区是未编组的事件。这样数据在内存中被充分分离以避免这些成本。但是,这会影响带宽。
直接在同一个处理程序中完成解组器和工作。根据您的解组器和处理程序中的工作量,这可能是可行的。
如 https://martinfowler.com/articles/lmax.html, I would need to process my RingBuffer's events first with Unmarchaler and then with Business Logic Processor. Suppose it is configured like (https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/dsl/Disruptor.html)
中所述 Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() { ... };
EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() { ... };
disruptor.handleEventsWith(handler1);
disruptor.after(handler1).handleEventsWith(handler2);
当时的想法是 handler1 是 unmarchaler,handler2 使用 handler1 处理的内容。
问题:如何准确地编写 "unmarchaling and putting back to disruptor" 部分的代码?我找到了这个 https://groups.google.com/forum/#!topic/lmax-disruptor/q6h5HBEBRUk 解释,但我不太明白。假设事件到达 handler1
的回调void onEvent(T event, long sequence, boolean endOfBatch)
(javadoc: https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/EventHandler.html)
从事件中解封一些数据。现在我需要将未编组数据附加到 handler2 的事件中,它将处理未编组对象。
"update"事件需要做什么?修改 "event" 对象就足够了吗?
这的影响实际上取决于您的特定场景,并且一如既往,如果您追求低延迟,则应该同时尝试和基准测试。
最直接的方法是更新 'event' 对象,但是根据您的特定方法,这可能会错过破坏者的很多 single-writer 好处。我会解释并提供一些选择。
假设您有 handler1 和 handler2,handler1 在线程 1 中是 运行,handler2 在线程 2 中是 运行。初始事件发布者在线程 0 上。
- Thread0 将一个条目写入插槽 1 的缓冲区
- Thread1 读取槽 1 中的条目并写入槽 1
- Thread0 将一个条目写入插槽 2 的缓冲区
- 线程 2 从插槽 1 读取并写入输出
- Thread1 读取槽 2 中的条目并写入槽 2
- Thread2 从插槽 2 读取并写入输出
如果你想到物理内存布局,slot1和slot2在内存中最好是相邻的。例如,它们可以是字节数组的某个子集。正如您所看到的,您正在从不同的线程(可能是不同的 cpu 核心)交替读取和写入非常相邻的内存块,这可能导致错误的共享/缓存行跳来跳去。最重要的是,你通过内存的读写不太可能是线性的,所以你会错过 CPU 缓存的一些好处。
其他一些可能更好的选项:
有单独的环形缓冲区,其中第一个环形缓冲区是原始数据,第二个环形缓冲区是未编组的事件。这样数据在内存中被充分分离以避免这些成本。但是,这会影响带宽。
直接在同一个处理程序中完成解组器和工作。根据您的解组器和处理程序中的工作量,这可能是可行的。