如何创建永不完成的 rxjs 主题

How create never complete subject of rxjs

朋友。我通过其他可观察对象的动态订阅获得了一个 Subject c。但是想要当所有可观察到的都完成时 c 完成而没有第一个完成。

我该怎么做?

const c = new Subject;

const a = Observable.interval(100).take(3).mapTo('a');
const b = Observable.interval(150).take(3).mapTo('b');


a.subscribe(c);
b.subscribe(c);
c.subscribe(console.log);

实际输出

a
b
a
a

期待

a
b
a
a
b
b

这是正确的行为,因为 Subject 有一个内部状态,当它收到 complete 通知时,它会将自己标记为 "stopped" 并且永远不会发出任何东西。这正是你的例子中发生的事情。当您使用 a.subscribe(c) 时,您将 c 订阅所有三种类型的通知,并且当源发出 complete 时,它也会被 c 接收并停止发出。

相反,您可以 c 仅订阅 next 条通知:

a.subscribe(v => c.next(v));
b.subscribe(v => c.next(v));

然后,如果您希望源在所有源完成时正确完成,您可以执行以下操作(您还需要在所有源 Observable 上使用 share()):

const a = Observable.interval(100)...share();
const b = Observable.interval(150)...share();

...

Observable.forkJoin(a, b)
  .subscribe(() => c.complete());