共享冷热可观察量

Sharing cold and hot observables

我对使用 Rx.Observable.just 创建的 shared 流的行为感到困惑。

例如:

var log = function(x) { console.log(x); };

var cold = Rx.Observable
  .just({ foo: 'cold' });

cold.subscribe(log); // <-- Logs three times
cold.subscribe(log);
cold.subscribe(log);

var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .share();

coldShare.subscribe(log); // <-- Only logs once
coldShare.subscribe(log);
coldShare.subscribe(log);

两个流都只发出一个事件,但未共享的事件可以订阅三次。这是为什么?

我需要 "fork" 一个流但共享它的值(然后合并分叉的流)。

如何共享流的价值,同时多次订阅它?

我意识到这可能与 "cold" 和 "hot" observables 的概念有关。然而:

Is the stream created by Rx.Observable.just() cold or hot?

冷。

How is one supposed to determine the answer to the previous question?

我想文档是唯一的指南。

How can I share the value of a stream but also subscribe to it multiple times?

您正在寻找 a connectable observable 的想法。例如:

var log = function(x) { console.log(x); };
var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .publish();

coldShare.subscribe(log); // Does nothing
coldShare.subscribe(log); // Does nothing
coldShare.subscribe(log); // Does nothing

coldShare.connect(); // Emits one value to its three subscribers (logs three times)

var log = function(x) {
  document.write(JSON.stringify(x));
  document.write("<br>");
};

var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .publish();

coldShare.subscribe(log); // <-- Only logs once
coldShare.subscribe(log);
coldShare.subscribe(log);

coldShare.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>

上面的例子记录了三次。使用 publish and connect,在调用 connect.

之前,您本质上是 "pause" 可观察对象

另请参阅:

我不明白你的第一个问题,但关于最后一个问题,因为我也遇到了那个问题:

  • Observables/Observers的Rxjs实现是基于观察者模式的,类似于老式的回调机制
  • 举例来说,这是创建可观察对象的基本形式(取自 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md 的文档)

    var source = Rx.Observable.create(function (observer) {
        observer.onNext(42);
        observer.onCompleted();
    
        // Note that this is optional, you do not have to return this if you require no cleanup
        return function () {
            console.log('disposed');
        };
    });
    
  • Rx.Observable.create 将一个函数(假设 factory_fn 是原始的)作为参数,该函数需要一个观察者。您的值是通过您在 factory_fn 的主体中选择的计算生成的,并且因为您在参数中有观察者,所以您可以 process/push 在您认为合适时生成的值。但是 factory_fn 没有被执行,它只是被注册了(就像回调一样)。每当相关的可观察对象上有 subscribe(observer) 时(即由 Rx.Observable.create(factory_fn) 编辑的 return 时,它就会被调用。

  • 一旦订阅完成(调用创建回调),值就会根据工厂函数中的逻辑流向您的观察者,并且一直保持这种状态,直到您的观察者完成或观察者取消订阅(假设您确实实施了取消操作作为 factory_fn 的 return 值的价值流)。
  • 这基本上意味着默认情况下,Rx.Observables 是冷的。
  • 我在使用了相当多的库后得出的结论是,除非有适当的文档记录,否则要确定可观察对象温度的唯一方法是查看源代码。或者在某处添加副作用,订阅两次,看看副作用是发生了两次还是只发生了一次(你就是这么做的)。那,或者在 Whosebug 上问问。
  • 例如,Rx.fromEvent 产生热可观察值,正如您从代码的最后一行 (return new EventObservable(element, eventName, selector).publish().refCount();) 中看到的那样。 (这里的代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/fromevent.js)。 publish 操作符是将冷可观察对象转换为热可观察对象的操作符之一。它是如何工作的超出了范围,所以我不会在这里详细说明。
  • 但是 Rx.DOM.fromWebSocket 不会产生热观测值 (https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/dom/websocket.js). Cf.
  • 我认为混淆经常来自于我们将实际来源(比如按钮点击流)与其表示(Rx.Observable)混为一谈的事实。不幸的是,当这种情况发生时,我们想象中的热源最终可能会变成冷源 Rx.Observable

所以,是的,Rx.Observable.just 创建了冷可观察对象。