在 LMAX Disruptor 中的后续消费者之间传递数据(从 unmarchaler 到业务逻辑)?

Passing data between subsequental Consumers in LMAX Disruptor (from unmarchaler to business logic)?

https://martinfowler.com/articles/lmax.html, I would need to process my RingBuffer's events first with Unmarchaler and then with Business Logic Processor. Suppose it is configured like (https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/dsl/Disruptor.html)

中所述
  Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(MyEvent.FACTORY, 32, Executors.newCachedThreadPool());
  EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>() { ... };
  EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>() { ... };
 disruptor.handleEventsWith(handler1);
 disruptor.after(handler1).handleEventsWith(handler2);

当时的想法是 handler1 是 unmarchaler,handler2 使用 handler1 处理的内容。

问题:如何准确地编写 "unmarchaling and putting back to disruptor" 部分的代码?我找到了这个 https://groups.google.com/forum/#!topic/lmax-disruptor/q6h5HBEBRUk 解释,但我不太明白。假设事件到达 handler1

的回调
void onEvent(T event, long sequence, boolean endOfBatch) 

(javadoc: https://lmax-exchange.github.io/disruptor/docs/com/lmax/disruptor/EventHandler.html)

从事件中解封一些数据。现在我需要将未编组数据附加到 handler2 的事件中,它将处理未编组对象。

"update"事件需要做什么?修改 "event" 对象就足够了吗?

这的影响实际上取决于您的特定场景,并且一如既往,如果您追求低延迟,则应该同时尝试和基准测试。

最直接的方法是更新 'event' 对象,但是根据您的特定方法,这可能会错过破坏者的很多 single-writer 好处。我会解释并提供一些选择。

假设您有 handler1 和 handler2,handler1 在线程 1 中是 运行,handler2 在线程 2 中是 运行。初始事件发布者在线程 0 上。

  • Thread0 将一个条目写入插槽 1 的缓冲区
  • Thread1 读取槽 1 中的条目并写入槽 1
  • Thread0 将一个条目写入插槽 2 的缓冲区
  • 线程 2 从插槽 1 读取并写入输出
  • Thread1 读取槽 2 中的条目并写入槽 2
  • Thread2 从插槽 2 读取并写入输出

如果你想到物理内存布局,slot1和slot2在内存中最好是相邻的。例如,它们可以是字节数组的某个子集。正如您所看到的,您正在从不同的线程(可能是不同的 cpu 核心)交替读取和写入非常相邻的内存块,这可能导致错误的共享/缓存行跳来跳去。最重要的是,你通过内存的读写不太可能是线性的,所以你会错过 CPU 缓存的一些好处。

其他一些可能更好的选项:

  • 有单独的环形缓冲区,其中第一个环形缓冲区是原始数据,第二个环形缓冲区是未编组的事件。这样数据在内存中被充分分离以避免这些成本。但是,这会影响带宽。

  • 直接在同一个处理程序中完成解组器和工作。根据您的解组器和处理程序中的工作量,这可能是可行的。