如何创建永不完成的 rxjs 主题
How create never complete subject of rxjs
朋友。我通过其他可观察对象的动态订阅获得了一个 Subject c
。但是想要当所有可观察到的都完成时 c
完成而没有第一个完成。
我该怎么做?
const c = new Subject;
const a = Observable.interval(100).take(3).mapTo('a');
const b = Observable.interval(150).take(3).mapTo('b');
a.subscribe(c);
b.subscribe(c);
c.subscribe(console.log);
实际输出
a
b
a
a
期待
a
b
a
a
b
b
这是正确的行为,因为 Subject 有一个内部状态,当它收到 complete
通知时,它会将自己标记为 "stopped" 并且永远不会发出任何东西。这正是你的例子中发生的事情。当您使用 a.subscribe(c)
时,您将 c
订阅所有三种类型的通知,并且当源发出 complete
时,它也会被 c
接收并停止发出。
相反,您可以 c
仅订阅 next
条通知:
a.subscribe(v => c.next(v));
b.subscribe(v => c.next(v));
然后,如果您希望源在所有源完成时正确完成,您可以执行以下操作(您还需要在所有源 Observable 上使用 share()
):
const a = Observable.interval(100)...share();
const b = Observable.interval(150)...share();
...
Observable.forkJoin(a, b)
.subscribe(() => c.complete());
朋友。我通过其他可观察对象的动态订阅获得了一个 Subject c
。但是想要当所有可观察到的都完成时 c
完成而没有第一个完成。
我该怎么做?
const c = new Subject;
const a = Observable.interval(100).take(3).mapTo('a');
const b = Observable.interval(150).take(3).mapTo('b');
a.subscribe(c);
b.subscribe(c);
c.subscribe(console.log);
实际输出
a
b
a
a
期待
a
b
a
a
b
b
这是正确的行为,因为 Subject 有一个内部状态,当它收到 complete
通知时,它会将自己标记为 "stopped" 并且永远不会发出任何东西。这正是你的例子中发生的事情。当您使用 a.subscribe(c)
时,您将 c
订阅所有三种类型的通知,并且当源发出 complete
时,它也会被 c
接收并停止发出。
相反,您可以 c
仅订阅 next
条通知:
a.subscribe(v => c.next(v));
b.subscribe(v => c.next(v));
然后,如果您希望源在所有源完成时正确完成,您可以执行以下操作(您还需要在所有源 Observable 上使用 share()
):
const a = Observable.interval(100)...share();
const b = Observable.interval(150)...share();
...
Observable.forkJoin(a, b)
.subscribe(() => c.complete());