为什么在这种情况下我们需要 Publish 和 RefCount Rx 运算符?
Why do we need Publish and RefCount Rx operators in this case?
我正在尝试熟悉反应性背压处理的问题,特别是通过阅读此 wiki:https://github.com/ReactiveX/RxJava/wiki/Backpressure
In the buffer paragraph,我们有这个更复杂的示例代码:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
如果我理解正确,我们通过为缓冲区运算符生成去抖动信号流来有效地去抖动源流。
但是为什么我们需要在这里使用 publish 和 refcount 操作符呢?如果我们放弃它们会导致什么问题?这些评论并没有让我更清楚,RxJava Observables 不是默认支持多播吗?
答案在于 hot and cold 可观察值之间的差异。
Buffer operator 结合了 2 个流,并且无法知道它们有一个共同的来源(在你的例子中)。激活(订阅)后,它将同时订阅它们,这在 return 中将触发对您的原始输入的 2 个不同的订阅。
现在可能会发生 2 种情况,要么输入是一个热可观察对象,订阅除了注册侦听器之外没有任何效果,一切都会按预期工作,或者它是一个冷可观察对象,每次订阅都会导致可能不同且不同步的流。
例如,冷可观察对象可以是在订阅时执行网络请求并通知结果的对象。不在其上调用发布意味着将完成 2 个请求。
Publish+refcount/connect 是将冷 observable 转换为热 observable 的常用方法,确保会发生单个订阅,并且所有流的行为相同。
我正在尝试熟悉反应性背压处理的问题,特别是通过阅读此 wiki:https://github.com/ReactiveX/RxJava/wiki/Backpressure
In the buffer paragraph,我们有这个更复杂的示例代码:
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
如果我理解正确,我们通过为缓冲区运算符生成去抖动信号流来有效地去抖动源流。
但是为什么我们需要在这里使用 publish 和 refcount 操作符呢?如果我们放弃它们会导致什么问题?这些评论并没有让我更清楚,RxJava Observables 不是默认支持多播吗?
答案在于 hot and cold 可观察值之间的差异。
Buffer operator 结合了 2 个流,并且无法知道它们有一个共同的来源(在你的例子中)。激活(订阅)后,它将同时订阅它们,这在 return 中将触发对您的原始输入的 2 个不同的订阅。
现在可能会发生 2 种情况,要么输入是一个热可观察对象,订阅除了注册侦听器之外没有任何效果,一切都会按预期工作,或者它是一个冷可观察对象,每次订阅都会导致可能不同且不同步的流。
例如,冷可观察对象可以是在订阅时执行网络请求并通知结果的对象。不在其上调用发布意味着将完成 2 个请求。
Publish+refcount/connect 是将冷 observable 转换为热 observable 的常用方法,确保会发生单个订阅,并且所有流的行为相同。