RxJs 5 的 share() 运算符如何工作?
How does the RxJs 5 share() operator work?
我不是 100% 清楚 RxJs 5 share()
运算符的工作原理,请参阅此处 latest docs. Jsbin for the question here.
如果我创建一个包含一系列 0 到 2 的可观察对象,每个值相隔一秒:
var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
console.log('some side effect');
});
如果我为这个可观察对象创建两个订阅者:
source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));
我在控制台中得到了这个:
"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"
我以为每个订阅都会订阅同一个Observable,但好像不是这样!它就像订阅行为创建了一个完全独立的 Observable!
但是如果将 share()
运算符添加到源 observable 中:
var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
console.log('some side effect ...');
})
.share();
然后我们得到这个:
"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"
如果没有 share()
,这就是我所期望的。
这是怎么回事,share()
运算符是如何工作的?每个订阅是否创建一个新的 Observable 链?
请注意,您使用的是 RxJS v5,而您的文档 link 似乎是 RxJS v4。我不记得具体细节,但我认为 share
运营商经历了一些变化,特别是在完成和重新订阅方面,但不要相信我的话。
回到你的问题,正如你在研究中所表明的,你的期望与图书馆设计不符。 Observables 懒惰地实例化它们的数据流,具体地在订阅者订阅时启动数据流。当第二个订阅者订阅同一个 observable 时,另一个新的数据流开始,就好像它是第一个订阅者一样(所以是的,每个订阅都会创建一个新的 observable 链,正如您所说的)。这就是 RxJS 术语中创造的冷可观察对象,这是 RxJS 可观察对象的默认行为。如果您想要一个可观察对象在数据到达时将其数据发送给它拥有的订阅者,这被称为热可观察对象,获得热可观察对象的一种方法是使用 share
运算符。
您可以在此处找到图示的订阅和数据流:(这对 RxJS v4 有效,但大部分对 v5 有效)。
如果满足这 2 个条件,share 就会成为可观察的 "hot":
- 订阅人数 > 0
- 并且 observable 尚未完成
场景 1:订阅者数量 > 0 且 observable 在新订阅前未完成
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2'));
}, 3000);
// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds
场景二:新订阅前订阅人数为零。变成 "cold"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer1.unsubscribe();
}, 1000);
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds
场景 3:当 observable 在新订阅之前完成时。变成 "cold"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2'));
}, 12000);
// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs
我不是 100% 清楚 RxJs 5 share()
运算符的工作原理,请参阅此处 latest docs. Jsbin for the question here.
如果我创建一个包含一系列 0 到 2 的可观察对象,每个值相隔一秒:
var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
console.log('some side effect');
});
如果我为这个可观察对象创建两个订阅者:
source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));
我在控制台中得到了这个:
"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"
我以为每个订阅都会订阅同一个Observable,但好像不是这样!它就像订阅行为创建了一个完全独立的 Observable!
但是如果将 share()
运算符添加到源 observable 中:
var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
console.log('some side effect ...');
})
.share();
然后我们得到这个:
"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"
如果没有 share()
,这就是我所期望的。
这是怎么回事,share()
运算符是如何工作的?每个订阅是否创建一个新的 Observable 链?
请注意,您使用的是 RxJS v5,而您的文档 link 似乎是 RxJS v4。我不记得具体细节,但我认为 share
运营商经历了一些变化,特别是在完成和重新订阅方面,但不要相信我的话。
回到你的问题,正如你在研究中所表明的,你的期望与图书馆设计不符。 Observables 懒惰地实例化它们的数据流,具体地在订阅者订阅时启动数据流。当第二个订阅者订阅同一个 observable 时,另一个新的数据流开始,就好像它是第一个订阅者一样(所以是的,每个订阅都会创建一个新的 observable 链,正如您所说的)。这就是 RxJS 术语中创造的冷可观察对象,这是 RxJS 可观察对象的默认行为。如果您想要一个可观察对象在数据到达时将其数据发送给它拥有的订阅者,这被称为热可观察对象,获得热可观察对象的一种方法是使用 share
运算符。
您可以在此处找到图示的订阅和数据流:
如果满足这 2 个条件,share 就会成为可观察的 "hot":
- 订阅人数 > 0
- 并且 observable 尚未完成
场景 1:订阅者数量 > 0 且 observable 在新订阅前未完成
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2'));
}, 3000);
// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds
场景二:新订阅前订阅人数为零。变成 "cold"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer1.unsubscribe();
}, 1000);
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time
}, 3000);
// observer2's onNext is called at startTime + 8 seconds
// observer2's onNext is called at startTime + 13 seconds
场景 3:当 observable 在新订阅之前完成时。变成 "cold"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log('observer1')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log('observer2'));
}, 12000);
// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs