LMAX Disruptor 依赖性 Graph/Gating 与 SequenceBarrier
LMAX Disruptor Dependency Graph/Gating with SequenceBarrier
目标
我正在尝试在处理程序之间创建一种有点循环的依赖关系,但我不太清楚如何正确处理。我想要实现的是 producer -> [handlers 1-3] -> handler 4
.
的变体
所以,disruptor.handleEventsWith(h1, h2, h3).then(h4);
。但我有额外的要求
- 虽然处理程序 1-3 会并行处理消息,但其中 none 会开始处理下一条消息,直到他们都处理完上一条消息。
- 在第一条消息之后,处理程序 1-3 等待处理程序 4 处理完最近的消息,然后再处理下一条消息。
使用单个事件处理程序的等效执行逻辑可以是:
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
Arrays.asList(h1, h2, h3).parallelStream()
.forEach(h -> h.onEvent(event, sequence, endOfBatch));
h4.onEvent(event, sequence, endOfBatch);
});
上下文
设计上下文是处理程序 1-3 各自根据消息更新自己的状态,并且在消息被三个中的每一个处理后它们处于一致的状态。处理程序 4 然后根据处理程序 1-3 更新的状态运行一些逻辑。因此处理程序 4 应该只看到由 1-3 维护的数据结构的一致状态,这意味着处理程序 1-3 不应该在处理程序 4 完成之前处理下一条消息。
(虽然目标肯定是使用 Disruptor 来管理并发,而不是 java.util.Stream
。)
不确定这是否重要,但处理程序 4 的逻辑也可以分为两部分,一部分需要更新处理程序 1-3 的 none,下一部分只需要处理程序 4 的第一部分已经完成。因此处理程序 1-3 可以在处理程序 4 的第二部分仍在执行时处理消息。
有没有办法做到这一点?或者也许我的设计有缺陷?我觉得应该有一种方法可以通过 SequenceBarrier
来做到这一点,但我不太明白如何实现这个自定义障碍。对于处理程序 1-3,我想我想用逻辑 handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()
设置障碍,但我不确定该逻辑放在哪里。
谢谢!
我会考虑让处理程序是无状态的,并使用它们处理的消息来包含您的系统状态。这样你就根本不需要同步你的处理程序。
目标
我正在尝试在处理程序之间创建一种有点循环的依赖关系,但我不太清楚如何正确处理。我想要实现的是 producer -> [handlers 1-3] -> handler 4
.
所以,disruptor.handleEventsWith(h1, h2, h3).then(h4);
。但我有额外的要求
- 虽然处理程序 1-3 会并行处理消息,但其中 none 会开始处理下一条消息,直到他们都处理完上一条消息。
- 在第一条消息之后,处理程序 1-3 等待处理程序 4 处理完最近的消息,然后再处理下一条消息。
使用单个事件处理程序的等效执行逻辑可以是:
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
Arrays.asList(h1, h2, h3).parallelStream()
.forEach(h -> h.onEvent(event, sequence, endOfBatch));
h4.onEvent(event, sequence, endOfBatch);
});
上下文
设计上下文是处理程序 1-3 各自根据消息更新自己的状态,并且在消息被三个中的每一个处理后它们处于一致的状态。处理程序 4 然后根据处理程序 1-3 更新的状态运行一些逻辑。因此处理程序 4 应该只看到由 1-3 维护的数据结构的一致状态,这意味着处理程序 1-3 不应该在处理程序 4 完成之前处理下一条消息。
(虽然目标肯定是使用 Disruptor 来管理并发,而不是 java.util.Stream
。)
不确定这是否重要,但处理程序 4 的逻辑也可以分为两部分,一部分需要更新处理程序 1-3 的 none,下一部分只需要处理程序 4 的第一部分已经完成。因此处理程序 1-3 可以在处理程序 4 的第二部分仍在执行时处理消息。
有没有办法做到这一点?或者也许我的设计有缺陷?我觉得应该有一种方法可以通过 SequenceBarrier
来做到这一点,但我不太明白如何实现这个自定义障碍。对于处理程序 1-3,我想我想用逻辑 handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()
设置障碍,但我不确定该逻辑放在哪里。
谢谢!
我会考虑让处理程序是无状态的,并使用它们处理的消息来包含您的系统状态。这样你就根本不需要同步你的处理程序。