如何在背压期间仅缓冲来自 rx.Observable 的最新排放
How to buffer only latest emission from rx.Observable during backpressure
我有一个 rx.Observable
,它将任务的进度发送到 onNext()
。 onNext()
排放有时会发生得如此之快以至于 Observer
无法跟上,导致 backpressure。我想通过缓冲 Observable
.
的最新排放来处理背压
例如:
Observable
发出 1 并且 Observer
接收 1.
- 虽然
Observer
仍在处理 1,Observable
发出 2,3 ,和 4。
Observer
完成处理 1 并开始处理 4(排放 2和 3 被删除)。
这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心用最新的进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。
有人知道如何用 RxJava 实现吗?
Observable.debounce
听起来正是您所需要的。在下面的示例中,每 200 毫秒 window 中仅可观察到的最新发射将发送给观察者。
observable
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe(observer);
onBackPressureLatest
是你的朋友。 :)
http://reactivex.io/RxJava/javadoc/rx/Observable.html#onBackpressureLatest()
我有一个 rx.Observable
,它将任务的进度发送到 onNext()
。 onNext()
排放有时会发生得如此之快以至于 Observer
无法跟上,导致 backpressure。我想通过缓冲 Observable
.
例如:
Observable
发出 1 并且Observer
接收 1.- 虽然
Observer
仍在处理 1,Observable
发出 2,3 ,和 4。 Observer
完成处理 1 并开始处理 4(排放 2和 3 被删除)。
这似乎是在 Rx Observable 中处理进度的常见情况,因为您通常只关心用最新的进度信息更新 UI。但是我一直无法弄清楚如何做到这一点。
有人知道如何用 RxJava 实现吗?
Observable.debounce
听起来正是您所需要的。在下面的示例中,每 200 毫秒 window 中仅可观察到的最新发射将发送给观察者。
observable
.debounce(200, TimeUnit.MILLISECONDS)
.subscribe(observer);
onBackPressureLatest
是你的朋友。 :)
http://reactivex.io/RxJava/javadoc/rx/Observable.html#onBackpressureLatest()