当我将延迟运算符添加到第二个订阅时,共享可观察对象变为单播
Sharing an observable becomes unicast when I add delay operator to a 2nd subscription
以下代码按预期工作:
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
setTimeout(() => {
source.subscribe(x => console.log('c2', x));
}, 2000);
生成以下输出:
c1 0
c1 1
c1 2
c2 2
c1 3
c2 3
c1 4
c2 4
但是当我将第二个订阅更改为使用 delay(2000) 而不是 setTimeout()
我得到一个未共享的不同流。
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
source.pipe(delay(2000)).subscribe(x => console.log('c2', x));
产生这个输出:
c1 0
c1 1
c1 2
c2 0
c1 3
c2 1
c1 4
c2 2
c2 3
c2 4
如何让第二个订阅者使用共享流?
我显然不完全理解 RX 运算符是如何在幕后工作的。
使用 source.pipe(delay(2000))
与使用 setTimeout()
完全不同。 delay()
运营商将延迟其来源的每次发射,这意味着您仍在立即进行两次订阅。
您可能想要做的是:
of(null)
.pipe(
delay(2000),
switchMapTo(source),
)
.subscribe();
或者这应该做同样的事情:
concat(timer(2000), source)
.subscribe();
视觉效果
这是您的原始流,其中 c2 在两秒后订阅
src: 0-1-2-3-4
c1: 0-1-2-3-4
c2: ----2-3-4
以及您的流,其中 c2 立即订阅并延迟每次发射 2 秒
src: 0-1-2-3-4
c1: 0-1-2-3-4
c2: ----0-1-2-3-4
共享来源
共享源与具有相同的输出不同。
考虑一下:
const src = interval(1000).pipe(
take(5),
map(x => x + 5),
share()
);
src.pipe(map(x => x - 1)).subscribe(console.log); // c1
src.pipe(map(x => x + 1)).subscribe(console.log); // c2
输出:
src: 5-6-7-8-9
c1: 4-5-6-7-8
c2: 6-7-8-9-10
尽管它们都有不同的输出。 c1
和 c2
都有相同的来源。 c1
和 c2
不生成任何数字,它们只是从给定的任何数字中加 1 和减 1。他们改变了来源。
这与您在第二个示例中所做的相同。不是转换数字,而是在发射发生时延迟发生变化。 c2
发射与源相同的流,它在 2 秒后才开始发射,并且在源完成后 2 秒仍在发射。
使用 setTimeout 延迟
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
source.subscribe(x =>
setTimeout(_ =>
console.log('c2', x),
2000
)
);
订阅前等待
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
timer(2000).pipe(
switchMap(_ => source)
).subscribe(x => console.log('c2', x));
以下代码按预期工作:
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
setTimeout(() => {
source.subscribe(x => console.log('c2', x));
}, 2000);
生成以下输出: c1 0 c1 1 c1 2 c2 2 c1 3 c2 3 c1 4 c2 4
但是当我将第二个订阅更改为使用 delay(2000) 而不是 setTimeout() 我得到一个未共享的不同流。
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
source.pipe(delay(2000)).subscribe(x => console.log('c2', x));
产生这个输出:
c1 0 c1 1 c1 2 c2 0 c1 3 c2 1 c1 4 c2 2 c2 3 c2 4
如何让第二个订阅者使用共享流? 我显然不完全理解 RX 运算符是如何在幕后工作的。
使用 source.pipe(delay(2000))
与使用 setTimeout()
完全不同。 delay()
运营商将延迟其来源的每次发射,这意味着您仍在立即进行两次订阅。
您可能想要做的是:
of(null)
.pipe(
delay(2000),
switchMapTo(source),
)
.subscribe();
或者这应该做同样的事情:
concat(timer(2000), source)
.subscribe();
视觉效果
这是您的原始流,其中 c2 在两秒后订阅
src: 0-1-2-3-4
c1: 0-1-2-3-4
c2: ----2-3-4
以及您的流,其中 c2 立即订阅并延迟每次发射 2 秒
src: 0-1-2-3-4
c1: 0-1-2-3-4
c2: ----0-1-2-3-4
共享来源
共享源与具有相同的输出不同。
考虑一下:
const src = interval(1000).pipe(
take(5),
map(x => x + 5),
share()
);
src.pipe(map(x => x - 1)).subscribe(console.log); // c1
src.pipe(map(x => x + 1)).subscribe(console.log); // c2
输出:
src: 5-6-7-8-9
c1: 4-5-6-7-8
c2: 6-7-8-9-10
尽管它们都有不同的输出。 c1
和 c2
都有相同的来源。 c1
和 c2
不生成任何数字,它们只是从给定的任何数字中加 1 和减 1。他们改变了来源。
这与您在第二个示例中所做的相同。不是转换数字,而是在发射发生时延迟发生变化。 c2
发射与源相同的流,它在 2 秒后才开始发射,并且在源完成后 2 秒仍在发射。
使用 setTimeout 延迟
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
source.subscribe(x =>
setTimeout(_ =>
console.log('c2', x),
2000
)
);
订阅前等待
const source = interval(1000).pipe(
take(5),
share()
);
source.subscribe(x => console.log('c1', x));
timer(2000).pipe(
switchMap(_ => source)
).subscribe(x => console.log('c2', x));