StringObservable.from(InputStream).share() 立即导致 MissingBackPressure

StringObservable.from(InputStream).share() cause immediate MissingBackPressure

我想在 Observable 中读取我的 InputStream 并发出已解析的数据(比方说 DataPacket)。我还希望有不同的 subscribers 来处理不同类型的 DataPacket(每个 subscriber 都会将其自己的过滤器应用于初始 observable)。这意味着 Observable 应该在不同的 subscribers 之间共享状态。我决定使用 share() 但遇到 MissingBackpressureException.

以下代码失败:

readSubscription = StringObservable.from(mInStream,1024)
        .share()
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Action1<byte[]>() {
            @Override
            public void call(byte[] bytes) {

            }
        });

我在 subscribe 方法中什么都不做 - subscriber 应该足够快了。

一切都很好,如果我删除 share()。此代码有效:

readSubscription = StringObservable.from(mInStream,1024)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Action1<byte[]>() {
            @Override
            public void call(byte[] bytes) {

            }
        });

我明白,share() 可能是一项昂贵的操作,我的 InputStream 会产生大量消息(每秒约 100 条消息)。

我的问题:如何实现读取InputStreamObservable并在不同的Subscribers.[=32=之间共享状态]

当前的 v0.22 不支持背压,所以你现在应该使用 onBackpressureBuffer 来避免 MissingBackpressureException。我会看看我们是否可以发布应该工作的最新代码。

此外,使用 share() 可能会令人惊讶,因为它确实对订阅者进行了引用计数。您不能真正用它一次订阅所有订阅者,其中一些订阅者可能不会从一开始就收到所有值。相反,您可以使用 publish() 运算符并在所有订阅者都订阅后对返回的可观察对象调用 connect()

您还可以使用 cache(),它将向任何迟到的订阅者重播源,但它也不支持背压,您也需要使用 onBackpressureBuffer