是否可以避免嵌套订阅?

is it possible to avoid nested subscription?

我正在使用 angular 和 rxJS 我是 rxJS 运算符的新手,找不到将所有内容保存在流中的方法。

我的流很长,所以我会开门见山地解释我正在处理的问题

我需要运行 parralel Observables 可以多次触发并以相同的方式处理它们的数据,但它们的订阅需要按一定顺序进行。例如

    firstObservable$.pipe(
      map(() => secondObservable$),//this will only be triggered once
      tap(value => doSomething(value)),//data is processed once following the stream
      mergeMap(() => ThirdObservable$),//this one will be triggered multiple times but is subscribed in a particular order
      tap(value => doAnotherThing(value)),//how can I attach the processing og data to the observable?
      mergeMap(() => FourthObservable$),//this one will be triggered multiple times but is subscribed in a particular order
      tap(value => andAnother(value)),//how can I attach the processing og data to the observable?
      map(() => FifthObservable$),//this will only be triggered once
      tap(value => again(value))//data is processed once following the stream
    ).subscribe()

现在我的问题是,如果 ThirdObservable$ 被第二次触发,它将继续流的其余部分并调用 FourthObservable$FifthObservable$ 我想要相当于

    firtsObservable$.subscribe( // triggered Once
      (value) => secondObservable$.subscribe( // triggered Once
        (secondValue) => {
          processSecondValueOnce(secondValue)
          ThirdObservable$.subscribe(thirdValue => process(thirdValue)) // this can be triggered multiple times
          fourthObservable$.subscribe(fourthValue => process(fourthValue)) // this can be triggered multiple times
          fifthObservable$.subscribe( // triggered Once
            (fifthValue) => {
              process(fifthValue)
            }
          )
        }
      )
    )

如果我理解你的问题你可能想尝试这样的事情

firstObservable$.pipe(
  map(() => secondObservable$),
  tap(value_1 => doSomething(value_1)),
  mergeMap(() => {
     // create 3 separate observables, each doing its specific processing
     third = ThirdObservable$.pipe(tap(value => doAnotherThing(value));
     fourth = FourthObservable$.pipe(tap(value => andAnother(value));
     fifth = FifthObservable$).pipe(tap(value => again(value));
     // return the merge of the 3 observables
     return merge(third, fourth, fifth);
  })
).subscribe()

我有一个观点,虽然我不明白。在第一个 map 运算符中,你 return 一个 Observable (secondObservable$)。这意味着我调用的变量 value_1 将包含一个 Observable,因此 doSomething 必须是一个期望 Observable 作为输入的函数。这是你真正想要的吗?

通常,在这种情况下,您希望使用“高阶”运算符,例如 switchMapmergeMapconcatMapexaustMap。这些运算符期望作为输入的函数 return 是一个 Observable 并“扁平化” return Observable。也许这也是你想要的。