选择正确的 RxJS 结构以确保一个订阅在另一个订阅之前完成
Choosing the right RxJS construct in order to ensure one subscription completes before another one
我面临 RxJS 问题。
我的应用程序目前设计如下:我有两个不同的客户端:ClientA
& ClientB
订阅了两个不同的 Observables:ObservableA
& ObservableB
。
请注意,该应用还会改变一个名为 aVariable
的变量。
流程如下:
ClientA
订阅了 ObservableA
.
ClientB
订阅 ObservableB
.
ObservableB
订阅从 aVariable
读取 false
并完成。
ObservableA
订阅将 aVariable
设置为 true
并完成(晚于 ObservableB
)。
而真正的目的是让 ObservableA
的订阅在 ObservableB
之前完成,这样 ClientB
就会从 aVariable
读取 true
... 或者换句话说,以某种方式确保 ObservableB
的订阅等到另一个订阅完成。
我不确定要使用什么 RxJS 构造来实现我想要的(我目前使用普通的 Observables)。我相信我在这里需要的不仅仅是普通的 Observables...
有人可以帮忙吗?
P.S。 请注意,aVariable
保存在 ngrx 商店,但我认为这与此问题无关...
P.P.S。 以上是我的真实应用程序的简化。
我认为你可以用一个中间主题来解决你的问题,当 streamB 被订阅时,你会发出一个值:
const completeStreamA = new Rx.Subject();
const streamA = Rx.Observable.never()
.takeUntil(completeStreamA);
const streamB = Rx.Observable.of('aValueOnStreamB')
.do(() => completeStreamA.next('complete stream A'));
//clientA subscribes immediately
streamA.subscribe(
next => console.log('a->next->'+next),
err => console.log('a->error->' + err.message),
() => console.log('a->complete')
);
setTimeout(() => {
//simulate later subscription by clientB
streamB.subscribe(
next => console.log('b->next->'+next),
err => console.log('b->error->' + err.message),
() => console.log('b->complete')
);
}, 3 * 1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
只有在 streamB 被订阅后,下一个值才会进入 completeStreamA
主题,这将完成 streamA。以上代码的输出:
a->complete
b->next->aValueOnStreamB
b->complete
我面临 RxJS 问题。
我的应用程序目前设计如下:我有两个不同的客户端:ClientA
& ClientB
订阅了两个不同的 Observables:ObservableA
& ObservableB
。
请注意,该应用还会改变一个名为 aVariable
的变量。
流程如下:
ClientA
订阅了ObservableA
.ClientB
订阅ObservableB
.ObservableB
订阅从aVariable
读取false
并完成。ObservableA
订阅将aVariable
设置为true
并完成(晚于ObservableB
)。
而真正的目的是让 ObservableA
的订阅在 ObservableB
之前完成,这样 ClientB
就会从 aVariable
读取 true
... 或者换句话说,以某种方式确保 ObservableB
的订阅等到另一个订阅完成。
我不确定要使用什么 RxJS 构造来实现我想要的(我目前使用普通的 Observables)。我相信我在这里需要的不仅仅是普通的 Observables...
有人可以帮忙吗?
P.S。 请注意,aVariable
保存在 ngrx 商店,但我认为这与此问题无关...
P.P.S。 以上是我的真实应用程序的简化。
我认为你可以用一个中间主题来解决你的问题,当 streamB 被订阅时,你会发出一个值:
const completeStreamA = new Rx.Subject();
const streamA = Rx.Observable.never()
.takeUntil(completeStreamA);
const streamB = Rx.Observable.of('aValueOnStreamB')
.do(() => completeStreamA.next('complete stream A'));
//clientA subscribes immediately
streamA.subscribe(
next => console.log('a->next->'+next),
err => console.log('a->error->' + err.message),
() => console.log('a->complete')
);
setTimeout(() => {
//simulate later subscription by clientB
streamB.subscribe(
next => console.log('b->next->'+next),
err => console.log('b->error->' + err.message),
() => console.log('b->complete')
);
}, 3 * 1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
只有在 streamB 被订阅后,下一个值才会进入 completeStreamA
主题,这将完成 streamA。以上代码的输出:
a->complete
b->next->aValueOnStreamB
b->complete