RxJava onBackpressureBuffer 不发射项目

RxJava onBackpressureBuffer not emitting items

我目睹了 onBackpressureBuffer 的奇怪行为,我不确定它是有效行为还是错误。

我正在进行一个以特定速率发射项目的 tcp 调用(使用流和 inputStream,但这只是为了一些信息)

最重要的是,我使用 create 创建了一个可观察对象,它会在每次准备就绪时发出一个项目。

我们称它为消息()。

然后我这样做:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});

我注意到使用分析工具很少会抛出 MissingBackPressureException,因此我在调用中添加了 onBackpressureBuffer。

如果我在 observeOn 之后添加它:

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})

一切正常,但这意味着它只有在到达 UI 主线程后才会缓冲,所以我更喜欢这样:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});

这就是事情开始变得奇怪的地方。

我注意到虽然 messages() 一直在发送项目,但在某些时候它们将停止交付给订阅者。

更准确地说,恰好在 16 个项目之后,显然令人高兴的是缓冲区将开始保存项目而不向前传递它们。

一旦我用某种超时机制取消了 messages(),它将导致 messages() 发出 onError() 并且缓冲区将立即发出它保留的所有项目(它们将被处理)。

我已经检查过是否是订户做太多工作的错,但不是,他已经完成但仍然没有收到物品...

我也试过在订阅者中使用 request(n) 方法在 onNext() 完成后请求一项,但缓冲区不工作。

我怀疑主 Android UI 线程的消息传递系统带有背压导致了此问题,但我无法解释原因。

有人可以解释为什么会这样吗?这是错误还是有效行为? 谢谢!

不知道如何 messages(),根据所描述的行为,这是与

类似的同池死锁

解决方法是将 .onBackpressureBuffer 放在 subscribeOnobserveOn 之间。

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});

不同,但答案归结为同一件事。 observeOn构造函数的实现:OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize):

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;

}

最后一行指向缓冲区大小。

buffer size on Android is 16.

解决方案只是将更大的缓冲区大小传递给 observeOn(Scheduler scheduler, int bufferSize) 运算符:

messages()
    .observeOn(AndroidSchedulers.mainThread(), {buffer_size})

注意不要设置太高的值,因为Android内存有限。