为什么这些 RxJS observables 会产生奇怪的输出?
Why are these RxJS observables producing strange outputs?
我有两个从同一来源创建的可观察对象。它们通过将随机值分配给正在发射的元素的 属性 的映射来区分。这是逻辑示例:
var Rx = require('rx');
var _ = require('lodash');
// create a source that emits a single event, and map that to an empty object
var source = Rx.Observable
.range(0, 1)
.map(function makeObject() { return {}; });
// map the empty object and give each one a type property with the
// value randomly chosen between "a" or "b"
var typed = source.map(function type(obj) {
obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
return obj;
});
// create an observable that only contains "a"
var a = typed.filter(function(obj) {
return obj.type === 'a';
});
// create an observable that only contains "b"
var b = typed.filter(function(obj) {
return obj.type === 'b';
});
// merge both observables and log the result in the subscription
Rx.Observable.merge(a, b).subscribe(function(obj) {
console.log(obj);
});
我希望这个最终的合并流将始终生成一个具有 obj.type === 'a'
或 obj.type === 'b'
的对象,然后完成。
然而,每次我 运行 这个脚本我都会得到不同的结果,有些是意料之中的,有些是意想不到的。
预期结果"a":
{ type : 'a' }
预期结果"b":
{ type : 'b' }
两者都出乎意料:
{ type : 'a' }
{ type : 'b' }
而且,有时我根本没有输出。我在这里错过了什么?
这个问题与 RX 的惰性有关:
您有两个由合并调用创建的订阅,每个订阅都会导致对所有可观察运算符的评估。
这意味着:
订阅 a -> 可能导致:
- 项目 a 生成并发出。
- 生成项目b然后过滤掉
订阅 b -> 相同,或者:
- 项目 b 生成然后发出
- 生成项目a然后过滤掉
如果合并这些流,您将得到以下任一结果:
只有a,只有b,a&b都没有。
更多详情
让我们看一个更简单的例子:
var source = Rx.Observable
.range(0, 1)
.map(function () { return Math.random(); })
现在在常规的 pub-sub 系统中,我们希望如果我添加 2 个订阅者,每个订阅者输出相同的值:
source.subscribe(function(x){console.log("sub 1:" + x)})
source.subscribe(function(x){console.log("sub 2:" + x)})
只有它们不是,每个都会打印不同的值,因为每个订阅都会再次调用 Math.Random()。
虽然有点奇怪,但它实际上是 rx observable 的正确行为,每个新订阅都会导致对 observable 流的新评估。
合并订阅这两个 observable(这意味着创建了两个值而不是一个)并将这些值发送到一个新的 observable。
为了避免这种行为,我们可以使用 RX 的发布操作符。
这里有更详细的解释:
http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
因此,在这种情况下:
var source = Rx.Observable
.range(0, 1)
.map(function makeObject() { return {}; });
var typed = source.map(function type(obj) {
obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
return obj;
}).replay().refCount();
我有两个从同一来源创建的可观察对象。它们通过将随机值分配给正在发射的元素的 属性 的映射来区分。这是逻辑示例:
var Rx = require('rx');
var _ = require('lodash');
// create a source that emits a single event, and map that to an empty object
var source = Rx.Observable
.range(0, 1)
.map(function makeObject() { return {}; });
// map the empty object and give each one a type property with the
// value randomly chosen between "a" or "b"
var typed = source.map(function type(obj) {
obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
return obj;
});
// create an observable that only contains "a"
var a = typed.filter(function(obj) {
return obj.type === 'a';
});
// create an observable that only contains "b"
var b = typed.filter(function(obj) {
return obj.type === 'b';
});
// merge both observables and log the result in the subscription
Rx.Observable.merge(a, b).subscribe(function(obj) {
console.log(obj);
});
我希望这个最终的合并流将始终生成一个具有 obj.type === 'a'
或 obj.type === 'b'
的对象,然后完成。
然而,每次我 运行 这个脚本我都会得到不同的结果,有些是意料之中的,有些是意想不到的。
预期结果"a":
{ type : 'a' }
预期结果"b":
{ type : 'b' }
两者都出乎意料:
{ type : 'a' }
{ type : 'b' }
而且,有时我根本没有输出。我在这里错过了什么?
这个问题与 RX 的惰性有关:
您有两个由合并调用创建的订阅,每个订阅都会导致对所有可观察运算符的评估。
这意味着:
订阅 a -> 可能导致:
- 项目 a 生成并发出。
- 生成项目b然后过滤掉
订阅 b -> 相同,或者:
- 项目 b 生成然后发出
- 生成项目a然后过滤掉
如果合并这些流,您将得到以下任一结果: 只有a,只有b,a&b都没有。
更多详情
让我们看一个更简单的例子:
var source = Rx.Observable
.range(0, 1)
.map(function () { return Math.random(); })
现在在常规的 pub-sub 系统中,我们希望如果我添加 2 个订阅者,每个订阅者输出相同的值:
source.subscribe(function(x){console.log("sub 1:" + x)})
source.subscribe(function(x){console.log("sub 2:" + x)})
只有它们不是,每个都会打印不同的值,因为每个订阅都会再次调用 Math.Random()。
虽然有点奇怪,但它实际上是 rx observable 的正确行为,每个新订阅都会导致对 observable 流的新评估。
合并订阅这两个 observable(这意味着创建了两个值而不是一个)并将这些值发送到一个新的 observable。
为了避免这种行为,我们可以使用 RX 的发布操作符。 这里有更详细的解释:
http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html
因此,在这种情况下:
var source = Rx.Observable
.range(0, 1)
.map(function makeObject() { return {}; });
var typed = source.map(function type(obj) {
obj.type = _.sample(['a', 'b']); // obj.type will randomly be 'a' or 'b'
return obj;
}).replay().refCount();