如何在背压期间仅缓冲来自 rx.Observable 的最新排放

How to buffer only latest emission from rx.Observable during backpressure

我有一个 rx.Observable,它将任务的进度发送到 onNext()onNext() 排放有时会发生得如此之快以至于 Observer 无法跟上,导致 backpressure。我想通过缓冲 Observable.

的最新排放来处理背压

例如:

这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心用最新的进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。

有人知道如何用 RxJava 实现吗?

Observable.debounce 听起来正是您所需要的。在下面的示例中,每 200 毫秒 window 中仅可观察到的最新发射将发送给观察者。

observable
    .debounce(200, TimeUnit.MILLISECONDS)
    .subscribe(observer);

onBackPressureLatest 是你的朋友。 :) http://reactivex.io/RxJava/javadoc/rx/Observable.html#onBackpressureLatest()