多个订阅嵌套在一个订阅中

Multiple subscriptions nested into one subscription

我发现自己在尝试设置一个非常简单的 rxjs 订阅流程时感到困惑。将多个不相关的订阅嵌套到另一个订阅中。

我在 angular 申请中,在进行其他订阅之前我需要填写下一个主题。

这是我想要实现的嵌套版本。

subject0.subscribe(a => {

    this.a = a;

    subject1.subscribe(x => {
        // Do some stuff that require this.a to exists
    });

    subject2.subscribe(y => {
        // Do some stuff that require this.a to exists
    });

});

我知道嵌套订阅不是好的做法,我尝试使用 flatMapconcatMap 但没有真正了解如何实现这一点。

您可以使用 concat 运算符来做到这一点。

const first = of('first').pipe(tap((value) => { /* doSomething */ }));
const second = of('second').pipe(tap((value) => { /* doSomething */ }));
const third = of('third').pipe(tap((value) => { /* doSomething */ }));

concat(first, second, third).subscribe();

这样,一切都按照定义的相同顺序链接和执行。

编辑

const first = of('first').pipe(tap(value => {
  // doSomething
  combineLatest(second, third).subscribe();
}));
const second = of('second').pipe(tap(value => { /* doSomething */ }));
const third = of('third').pipe(tap(value => { /* doSomething */ }));
first.subscribe();

这样,secondthird 是 运行 异步的,一旦 first 发出。

你可以这样做:

subject$: Subject<any> = new Subject();
this.subject$.pipe(
        switchMap(() => subject0),
        tap(a => {
            this.a = a;
        }),
        switchMap(() => subject1),
        tap(x => {
            // Do some stuff that require this.a to exists
        }),
        switchMap(() => subject2),
        tap(y => {
            // Do some stuff that require this.a to exists
        })
    );

如果你想触发这个,只需调用 this.subject$.next();

编辑: 这是 forkJoin 的一种可能方法,它叫平行主题。

subject$: Subject<any> = new Subject();
    this.subject$.pipe(
        switchMap(() => subject0),
        tap(a => {
            this.a = a;
        }),
        switchMap(
            () => forkJoin(
                subject1,
                subject2
        )),
        tap([x,y] => {
          // Do some stuff that require this.a to exists
        })
    );

将每个 Observable 的数据流分开总是一个好主意,这样您以后可以轻松地组合它们。

const first$ = this.http.get('one').pipe(
  shareReplay(1)
)

shareReplay 用于使 Observable hot 因此它不会为每个订阅调用 http.get('one')

const second$ = this.first$.pipe(
  flatMap(firstCallResult => this.http.post('second', firstCallResult))
);

const third$ = this.first$.pipe(
  flatMap(firstCallResult => this.http.post('third', firstCallResult))
);

之后您可以订阅您需要的 Observables:

second$.subscribe(()=>{}) // in this case two requests will be sent - the first one (if there were no subscribes before) and the second one

third$.subscribe(() => {}) // only one request is sent - the first$ already has the response cached

如果您不想在任何地方存储 first$ 的值,只需将其转换为:

this.http.get('one').pipe(
  flatMap(firstCallResult => combineLatest([
    this.http.post('two', firstCallResult),
    this.http.post('three', firstCallResult)
  ])
).subscribe(([secondCallResult, thirdCallResult]) => {})

您也可以使用 BehaviorSubject 将值存储在其中:

const behaviorSubject = new BehaviorSubject<string>(null); // using BehaviorSubject does not require you to subscribe to it (because it's a hot Observable)
const first$ = behaviorSubject.pipe(
  filter(Boolean), // to avoid emitting null at the beginning
  flatMap(subjectValue => this.http.get('one?' + subjectValue))
)

const second$ = first$.pipe(
  flatMap(firstRes => this.http.post('two', firstRes))
)

const third$ = first$.pipe(
  flatMap(()=>{...})
)

behaviorSubject.next('1') // second$ and third$ will emit new values
behaviorSubject.next('2') // second$ and third$ will emit the updated values again