可观察到像 Lmax Disruptor 这样的批处理

Observable to batch like Lmax Disruptor

熟悉lmax ring buffer (disruptor)的人都知道该数据结构的最大优势之一是它可以对传入事件进行批处理,当我们有一个消费者可以利用批处理使系统自动调整时负载,你扔给它的事件越多越好。

我想知道我们不能用 Observable 实现相同的效果(针对批处理功能)。我已经尝试了 Observable.buffer,但这是非常不同的,缓冲区将等待并且不会在预期数量的事件未到达时发出批处理。我们想要的完全不同。

鉴于订阅者正在等待来自 Observable<Collection<Event>> 的批次,当单个项目到达流时,它会发出单个元素批次,该批次由订阅者处理,同时它正在处理其他元素到达并被收集进入下一批,一旦订阅者完成执行,它就会获得下一批,其中包含自上次开始处理以来到达的事件数量...

因此,如果我们的订阅者足够快,可以一次处理一个事件,它就会这样做,如果负载变高,它仍将具有相同的处理频率,但每次都会处理更多事件(从而解决背压问题)... 不像缓冲区会粘住并等待批次填满。

有什么建议吗?还是我应该使用环形缓冲区?

RxJava 和 Disruptor 代表了两种不同的编程方法。

我没有使用 Disruptor 的经验,但根据视频谈话,它基本上是一个大缓冲区,生产者像消防水管一样发出数据,消费者 spin/yield/block 直到数据可用。

另一方面,RxJava 的目标是非阻塞事件传递。我们也有环形缓冲区,特别是在 observeOn 中,它充当生产者和消费者之间的异步边界,但它们要小得多,我们通过应用协程方法来避免缓冲区溢出和缓冲区膨胀。协同例程归结为发送到您的回调的回调,因此您可以回调我们的回调以按照您的节奏向您发送一些数据。此类请求的频率决定了节奏。

有些数据源不支持此类合作流式传输,并且需要 onBackpressureXXX 运算符之一,如果下游请求速度不够快,该运算符将 buffer/drop 值。

如果您认为批量处理数据比一个一个地处理数据更有效,您可以使用 buffer 运算符,它具有重载来指定缓冲区的持续时间:例如,您可以, 10 毫秒的数据,与在此持续时间内到达的值无关。

通过请求频率控制批量大小很棘手,可能会产生无法预料的后果。通常,问题在于,如果您从批处理源 request(n),则表明您可以处理 n 个元素,但源现在必须创建 n 个大小为 1 的缓冲区(因为类型是 Observable<List<T>>)。相反,如果没有请求被调用,运算符缓冲数据导致更长的缓冲区。这些行为会在处理过程中引入额外的开销,如果你真的可以跟上并且还必须将冷源变成一个消防水管(因为否则你所拥有的基本上是 buffer(1)),这本身现在会导致缓冲区膨胀。