选择正确的 RxJS 结构以确保一个订阅在另一个订阅之前完成

Choosing the right RxJS construct in order to ensure one subscription completes before another one

我面临 RxJS 问题。

我的应用程序目前设计如下:我有两个不同的客户端:ClientA & ClientB 订阅了两个不同的 Observables:ObservableA & ObservableB

请注意,该应用还会改变一个名为 aVariable 的变量。

流程如下:

  1. ClientA 订阅了 ObservableA.
  2. ClientB 订阅 ObservableB.
  3. ObservableB 订阅从 aVariable 读取 false 并完成。
  4. ObservableA 订阅将 aVariable 设置为 true 并完成(晚于 ObservableB)。

而真正的目的是让 ObservableA 的订阅在 ObservableB 之前完成,这样 ClientB 就会从 aVariable 读取 true ... 或者换句话说,以某种方式确保 ObservableB 的订阅等到另一个订阅完成。

我不确定要使用什么 RxJS 构造来实现我想要的(我目前使用普通的 Observables)。我相信我在这里需要的不仅仅是普通的 Observables...

有人可以帮忙吗?

P.S。 请注意,aVariable 保存在 ngrx 商店,但我认为这与此问题无关...

P.P.S。 以上是我的真实应用程序的简化。

我认为你可以用一个中间主题来解决你的问题,当 streamB 被订阅时,你会发出一个值:

const completeStreamA = new Rx.Subject();

const streamA = Rx.Observable.never()
  .takeUntil(completeStreamA);

const streamB = Rx.Observable.of('aValueOnStreamB')
  .do(() => completeStreamA.next('complete stream A'));

//clientA subscribes immediately
streamA.subscribe(
  next => console.log('a->next->'+next),
  err => console.log('a->error->' + err.message),
  () => console.log('a->complete')
);

setTimeout(() => {
  //simulate later subscription by clientB
  streamB.subscribe(
    next => console.log('b->next->'+next),
    err => console.log('b->error->' + err.message),
    () => console.log('b->complete')
  );
}, 3 * 1000);
  
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

只有在 streamB 被订阅后,下一个值才会进入 completeStreamA 主题,这将完成 streamA。以上代码的输出:

a->complete
b->next->aValueOnStreamB
b->complete