了解 RxJava onBackpressureBuffer 中的容量参数

Understanding the capacity param in RxJava onBackpressureBuffer

这是我写的一个小示例应用程序:

package ru.maksim.sample.app

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit


class MainActivity : AppCompatActivity() {
    private val subject = PublishSubject.create<Int>()
    private lateinit var disposable: Disposable
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        disposable = observeInts()
            .subscribe(
                {
                    Log.d("SampleApp", "next=$it")
                },
                {
                    Log.e("SampleApp", "error", it)
                },
                {
                    Log.d("SampleApp", "complete")
                }
            )
        start.setOnClickListener {
            subject.onNext(1)
        }
    }

    override fun onDestroy() {
        disposable.dispose()
        super.onDestroy()
    }

    private fun observeInts() = subject
        .toFlowable(BackpressureStrategy.BUFFER)
        .onBackpressureBuffer(4, {
            Log.d("SampleApp", "Overflow")
        }, BackpressureOverflowStrategy.DROP_LATEST)
        .observeOn(Schedulers.computation())
        .flatMap {
            Log.d("SampleApp", "onNext BEFORE delay: $it")
            Flowable.just(it)
        }
        .delay(10L, TimeUnit.SECONDS)
        .flatMap {
            Log.d("SampleApp", "onNext AFTER delay: $it")
            Flowable.just(it)
        }
}

start 只是一个按钮。按下按钮超过4次(4是onBackpressureBuffer中可以看到的缓冲容量)次后,我希望看到Overflow是日志,但没有发生。我不明白为什么。

我认为您没有看到警告,因为事件很少。 您可以尝试将您在 setOnClickListener 中的回调替换为这个并再次检查:

start.setOnClickListener {
    for (i in 0..1000) {
        subject.onNext(i)
    }   
}

您也可以使用方法 "observeOn" 将行更改为:

.observeOn(Schedulers.computation(), false, 1)

因为调度程序也有他的缓冲区。

我想我找到了答案here。即,

onBackpressureBuffer(int capacity)

这是一个有界版本,它会在缓冲区达到给定容量时发出 BufferOverflowError 信号。

Flowable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

随着越来越多的运算符现在允许设置其缓冲区大小,此运算符的相关性正在降低。对于其余部分,这为 "extend their internal buffer" 提供了一个机会,方法是使用比默认值更大的 onBackpressureBuffer 数字。


看起来除了 16 传递给 onBackpressureBuffer 之外,其他运算符也有自己的缓冲区。而当收到第 17 项时,前面的 16 项可能会缓存在不同运营商的缓冲区中。