RxJava onBackPressureBuffer 和 onBackPressureDrop 混淆

Confusion on RxJava onBackPressureBuffer and onBackPressureDrop

我希望我的代码像这张图一样工作,但它不起作用...

我的代码:

    private fun <T> register(cls: Class<T>): Flowable<Pair<T, Long>> {
        return FlowableFromObservable(mRelay).onBackpressureBuffer(4).filter(
            /* filter target event. */
            EventPredictable(cls)
        ).cast(
            /* cast to target event */
            cls
        ).onBackpressureDrop {
            Log.i(TAG, "drop event: $it")
        }.concatMap { data ->
            /* start interval task blocking */
            val period = 1L
            val unit = TimeUnit.SECONDS
            MLog.d(TAG, "startInterval: data = $data")
            Flowable.interval(0, period, unit).take(DURATION.toLong()).takeUntil(
                getStopFlowable()
            ).map {
                Pair(data, it)
            }
        }
    }

    private fun getStopFlowable(): Flowable<StopIntervalEvent> {
        return RxBus.getDefault().register(StopIntervalEvent::class.java)
                    .toFlowable(BackpressureStrategy.LATEST)
    }

当我在 10 毫秒内发送 140 个事件时,我的代码丢弃了 12 个事件,而不是我期望的丢弃 140 - 4 = 136 个事件。为什么我的代码不像上图那样工作?感谢您的收看和解答!

onBackpressurDrop 随时准备接收物品,因此 onBackpressureBuffer 对您的设置没有实际影响。 onBackpressurBuffer(int) 会在溢出时失败,所以你永远不会看到预期的行为。此外,concatMap 默认情况下预先获取 2 个项目,因此它将获得源项目 1 和 2。

相反,尝试使用 overload 和可配置的背压策略:

 mRelay
 .toFlowable(BackpressureStaregy.MISSING)
 .onBackpressureBuffer(4, null, BackpressureOverflowStrategy.DROP_LATEST)
 .flatMap(data -> 
     Flowable.intervalRange(0, DURATION.toLong(), 0, period, unit)
     .takeUntil(getStopFlowable())
     .map(it -> new Pair(data, it))
     , 1 // <--------------------------------------------------- max concurrency
 );