StringObservable.from(InputStream).share() 立即导致 MissingBackPressure
StringObservable.from(InputStream).share() cause immediate MissingBackPressure
我想在 Observable
中读取我的 InputStream
并发出已解析的数据(比方说 DataPacket
)。我还希望有不同的 subscribers
来处理不同类型的 DataPacket
(每个 subscriber
都会将其自己的过滤器应用于初始 observable
)。这意味着 Observable
应该在不同的 subscribers
之间共享状态。我决定使用 share()
但遇到 MissingBackpressureException
.
以下代码失败:
readSubscription = StringObservable.from(mInStream,1024)
.share()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我在 subscribe
方法中什么都不做 - subscriber
应该足够快了。
一切都很好,如果我删除 share()
。此代码有效:
readSubscription = StringObservable.from(mInStream,1024)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我明白,share()
可能是一项昂贵的操作,我的 InputStream
会产生大量消息(每秒约 100 条消息)。
我的问题:如何实现读取InputStream
的Observable
并在不同的Subscribers
.[=32=之间共享状态]
当前的 v0.22 不支持背压,所以你现在应该使用 onBackpressureBuffer
来避免 MissingBackpressureException
。我会看看我们是否可以发布应该工作的最新代码。
此外,使用 share()
可能会令人惊讶,因为它确实对订阅者进行了引用计数。您不能真正用它一次订阅所有订阅者,其中一些订阅者可能不会从一开始就收到所有值。相反,您可以使用 publish()
运算符并在所有订阅者都订阅后对返回的可观察对象调用 connect()
。
您还可以使用 cache()
,它将向任何迟到的订阅者重播源,但它也不支持背压,您也需要使用 onBackpressureBuffer
。
我想在 Observable
中读取我的 InputStream
并发出已解析的数据(比方说 DataPacket
)。我还希望有不同的 subscribers
来处理不同类型的 DataPacket
(每个 subscriber
都会将其自己的过滤器应用于初始 observable
)。这意味着 Observable
应该在不同的 subscribers
之间共享状态。我决定使用 share()
但遇到 MissingBackpressureException
.
以下代码失败:
readSubscription = StringObservable.from(mInStream,1024)
.share()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我在 subscribe
方法中什么都不做 - subscriber
应该足够快了。
一切都很好,如果我删除 share()
。此代码有效:
readSubscription = StringObservable.from(mInStream,1024)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我明白,share()
可能是一项昂贵的操作,我的 InputStream
会产生大量消息(每秒约 100 条消息)。
我的问题:如何实现读取InputStream
的Observable
并在不同的Subscribers
.[=32=之间共享状态]
当前的 v0.22 不支持背压,所以你现在应该使用 onBackpressureBuffer
来避免 MissingBackpressureException
。我会看看我们是否可以发布应该工作的最新代码。
此外,使用 share()
可能会令人惊讶,因为它确实对订阅者进行了引用计数。您不能真正用它一次订阅所有订阅者,其中一些订阅者可能不会从一开始就收到所有值。相反,您可以使用 publish()
运算符并在所有订阅者都订阅后对返回的可观察对象调用 connect()
。
您还可以使用 cache()
,它将向任何迟到的订阅者重播源,但它也不支持背压,您也需要使用 onBackpressureBuffer
。