RxJava onBackPressureBuffer 和 onBackPressureDrop 混淆
Confusion on RxJava onBackPressureBuffer and onBackPressureDrop
我希望我的代码像这张图一样工作,但它不起作用...
我的代码:
private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
/* filter target event. */
EventPredictable(cls)
).cast(
/* cast to target event */
cls
).onBackpressureDrop {
Log.i(TAG, "drop event: $it")
}.concatMap { data ->
/* start interval task blocking */
val period = 1L
val unit = TimeUnit.SECONDS
MLog.d(TAG, "startInterval: data = $data")
Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
getStopFlowable()
).map {
Pair(data, it)
}
}
}
private fun getStopFlowable(): Flowable<StopIntervalEvent> {
return RxBus.getDefault().register(StopIntervalEvent::class.java)
.toFlowable(BackpressureStrategy.LATEST)
}
当我在 10 毫秒内发送 140 个事件时,我的代码丢弃了 12 个事件,而不是我期望的丢弃 140 - 4 = 136 个事件。为什么我的代码不像上图那样工作?感谢您的收看和解答!
onBackpressurDrop
随时准备接收物品,因此 onBackpressureBuffer
对您的设置没有实际影响。 onBackpressurBuffer(int)
会在溢出时失败,所以你永远不会看到预期的行为。此外,concatMap
默认情况下预先获取 2 个项目,因此它将获得源项目 1 和 2。
相反,尝试使用 overload 和可配置的背压策略:
mRelay
.toFlowable(BackpressureStaregy.MISSING)
.onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
.flatMap(data ->
Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
.takeUntil(getStopFlowable())
.map(it -> new Pair(data, it))
, 1 // <--------------------------------------------------- max concurrency
);
我希望我的代码像这张图一样工作,但它不起作用...
我的代码:
private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
/* filter target event. */
EventPredictable(cls)
).cast(
/* cast to target event */
cls
).onBackpressureDrop {
Log.i(TAG, "drop event: $it")
}.concatMap { data ->
/* start interval task blocking */
val period = 1L
val unit = TimeUnit.SECONDS
MLog.d(TAG, "startInterval: data = $data")
Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
getStopFlowable()
).map {
Pair(data, it)
}
}
}
private fun getStopFlowable(): Flowable<StopIntervalEvent> {
return RxBus.getDefault().register(StopIntervalEvent::class.java)
.toFlowable(BackpressureStrategy.LATEST)
}
当我在 10 毫秒内发送 140 个事件时,我的代码丢弃了 12 个事件,而不是我期望的丢弃 140 - 4 = 136 个事件。为什么我的代码不像上图那样工作?感谢您的收看和解答!
onBackpressurDrop
随时准备接收物品,因此 onBackpressureBuffer
对您的设置没有实际影响。 onBackpressurBuffer(int)
会在溢出时失败,所以你永远不会看到预期的行为。此外,concatMap
默认情况下预先获取 2 个项目,因此它将获得源项目 1 和 2。
相反,尝试使用 overload 和可配置的背压策略:
mRelay
.toFlowable(BackpressureStaregy.MISSING)
.onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
.flatMap(data ->
Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
.takeUntil(getStopFlowable())
.map(it -> new Pair(data, it))
, 1 // <--------------------------------------------------- max concurrency
);