了解 RxJava onBackpressureBuffer 中的容量参数
Understanding the capacity param in RxJava onBackpressureBuffer
这是我写的一个小示例应用程序:
package ru.maksim.sample.app
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit
class MainActivity : AppCompatActivity() {
private val subject = PublishSubject.create<Int>()
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposable = observeInts()
.subscribe(
{
Log.d("SampleApp", "next=$it")
},
{
Log.e("SampleApp", "error", it)
},
{
Log.d("SampleApp", "complete")
}
)
start.setOnClickListener {
subject.onNext(1)
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
private fun observeInts() = subject
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(4, {
Log.d("SampleApp", "Overflow")
}, BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.computation())
.flatMap {
Log.d("SampleApp", "onNext BEFORE delay: $it")
Flowable.just(it)
}
.delay(10L, TimeUnit.SECONDS)
.flatMap {
Log.d("SampleApp", "onNext AFTER delay: $it")
Flowable.just(it)
}
}
start
只是一个按钮。按下按钮超过4次(4是onBackpressureBuffer
中可以看到的缓冲容量)次后,我希望看到Overflow
是日志,但没有发生。我不明白为什么。
我认为您没有看到警告,因为事件很少。
您可以尝试将您在 setOnClickListener 中的回调替换为这个并再次检查:
start.setOnClickListener {
for (i in 0..1000) {
subject.onNext(i)
}
}
您也可以使用方法 "observeOn" 将行更改为:
.observeOn(Schedulers.computation(), false, 1)
因为调度程序也有他的缓冲区。
我想我找到了答案here。即,
onBackpressureBuffer(int capacity)
这是一个有界版本,它会在缓冲区达到给定容量时发出 BufferOverflowError 信号。
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
随着越来越多的运算符现在允许设置其缓冲区大小,此运算符的相关性正在降低。对于其余部分,这为 "extend their internal buffer" 提供了一个机会,方法是使用比默认值更大的 onBackpressureBuffer 数字。
看起来除了 16
传递给 onBackpressureBuffer
之外,其他运算符也有自己的缓冲区。而当收到第 17 项时,前面的 16 项可能会缓存在不同运营商的缓冲区中。
这是我写的一个小示例应用程序:
package ru.maksim.sample.app
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.subjects.PublishSubject
import kotlinx.android.synthetic.main.activity_main.*
import java.util.concurrent.TimeUnit
class MainActivity : AppCompatActivity() {
private val subject = PublishSubject.create<Int>()
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposable = observeInts()
.subscribe(
{
Log.d("SampleApp", "next=$it")
},
{
Log.e("SampleApp", "error", it)
},
{
Log.d("SampleApp", "complete")
}
)
start.setOnClickListener {
subject.onNext(1)
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
private fun observeInts() = subject
.toFlowable(BackpressureStrategy.BUFFER)
.onBackpressureBuffer(4, {
Log.d("SampleApp", "Overflow")
}, BackpressureOverflowStrategy.DROP_LATEST)
.observeOn(Schedulers.computation())
.flatMap {
Log.d("SampleApp", "onNext BEFORE delay: $it")
Flowable.just(it)
}
.delay(10L, TimeUnit.SECONDS)
.flatMap {
Log.d("SampleApp", "onNext AFTER delay: $it")
Flowable.just(it)
}
}
start
只是一个按钮。按下按钮超过4次(4是onBackpressureBuffer
中可以看到的缓冲容量)次后,我希望看到Overflow
是日志,但没有发生。我不明白为什么。
我认为您没有看到警告,因为事件很少。 您可以尝试将您在 setOnClickListener 中的回调替换为这个并再次检查:
start.setOnClickListener {
for (i in 0..1000) {
subject.onNext(i)
}
}
您也可以使用方法 "observeOn" 将行更改为:
.observeOn(Schedulers.computation(), false, 1)
因为调度程序也有他的缓冲区。
我想我找到了答案here。即,
onBackpressureBuffer(int capacity)
这是一个有界版本,它会在缓冲区达到给定容量时发出 BufferOverflowError 信号。
Flowable.range(1, 1_000_000)
.onBackpressureBuffer(16)
.observeOn(Schedulers.computation())
.subscribe(e -> { }, Throwable::printStackTrace);
随着越来越多的运算符现在允许设置其缓冲区大小,此运算符的相关性正在降低。对于其余部分,这为 "extend their internal buffer" 提供了一个机会,方法是使用比默认值更大的 onBackpressureBuffer 数字。
看起来除了 16
传递给 onBackpressureBuffer
之外,其他运算符也有自己的缓冲区。而当收到第 17 项时,前面的 16 项可能会缓存在不同运营商的缓冲区中。