当我将延迟运算符添加到第二个订阅时,共享可观察对象变为单播

Sharing an observable becomes unicast when I add delay operator to a 2nd subscription

以下代码按预期工作:

    const source = interval(1000).pipe(
        take(5),    
        share()
  );

  source.subscribe(x => console.log('c1', x));
  setTimeout(() => {
    source.subscribe(x => console.log('c2', x));
  }, 2000);

生成以下输出: c1 0 c1 1 c1 2 c2 2 c1 3 c2 3 c1 4 c2 4

但是当我将第二个订阅更改为使用 delay(2000) 而不是 setTimeout() 我得到一个未共享的不同流。

    const source = interval(1000).pipe(
        take(5),    
        share()
  );

  source.subscribe(x => console.log('c1', x));

  source.pipe(delay(2000)).subscribe(x => console.log('c2', x));

产生这个输出:

c1 0 c1 1 c1 2 c2 0 c1 3 c2 1 c1 4 c2 2 c2 3 c2 4

如何让第二个订阅者使用共享流? 我显然不完全理解 RX 运算符是如何在幕后工作的。

使用 source.pipe(delay(2000)) 与使用 setTimeout() 完全不同。 delay() 运营商将延迟其来源的每次发射,这意味着您仍在立即进行两次订阅。

您可能想要做的是:

of(null)
  .pipe(
    delay(2000),
    switchMapTo(source),
  )
  .subscribe();

或者这应该做同样的事情:

concat(timer(2000), source)
  .subscribe();

视觉效果

这是您的原始流,其中 c2 在两秒后订阅

src: 0-1-2-3-4
 c1: 0-1-2-3-4
 c2: ----2-3-4

以及您的流,其中 c2 立即订阅并延迟每次发射 2 秒

src: 0-1-2-3-4
 c1: 0-1-2-3-4
 c2: ----0-1-2-3-4

共享来源

共享源与具有相同的输出不同。

考虑一下:

const src = interval(1000).pipe(
  take(5),    
  map(x => x + 5),
  share()    
);

src.pipe(map(x => x - 1)).subscribe(console.log); // c1
src.pipe(map(x => x + 1)).subscribe(console.log); // c2

输出:

src: 5-6-7-8-9
 c1: 4-5-6-7-8
 c2: 6-7-8-9-10

尽管它们都有不同的输出。 c1c2 都有相同的来源。 c1c2 不生成任何数字,它们只是从给定的任何数字中加 1 和减 1。他们改变了来源。

这与您在第二个示例中所做的相同。不是转换数字,而是在发射发生时延迟发生变化。 c2 发射与源相同的流,它在 2 秒后才开始发射,并且在源完成后 2 秒仍在发射。

使用 setTimeout 延迟

const source = interval(1000).pipe(
  take(5),    
  share()
);

source.subscribe(x => console.log('c1', x));
source.subscribe(x =>
  setTimeout(_ => 
    console.log('c2', x), 
    2000
  )
);

订阅前等待

const source = interval(1000).pipe(
  take(5),    
  share()
);
source.subscribe(x => console.log('c1', x));
timer(2000).pipe(
  switchMap(_ => source)
).subscribe(x => console.log('c2', x));