RxJS 和异步 - 了解执行流程

RxJS and async - Understand the execution flow

我很想用RxJS理解async;我写这个例子是为了看看异步函数调用在 RxJS observables 中是如何工作的;结果是:
func1 --- 10
func1 --- 20
func1 --- 40
func1 --- 30
switchMap ---1--- -->item: 0
switchMap ---1--- -->item: 1
switchMap ---1--- -->item: 2
switchMap ---1--- -->item: 3
switchMap ---1--- -->item: 4
switchMap ---1--- -->item: 5
switchMap ---1--- -->item: 6
switchMap ---1--- -->item: 7
END
switchMap ---2--- -->item: 1
switchMap ---3--- -->item: 0.111
switchMap ---3--- -->item: 1.111
switchMap ---3--- -->item: 2.111
switchMap ---3--- -->item: 3.111
switchMap ---3--- -->item: 4.111
switchMap ---3--- -->item: 5.111
switchMap ---3--- -->项目:6.111
switchMap ---3--- -->item: 7.111
switchMap ---3--- -->item: 8.111
switchMap ---3--- -->item: 9.111
MERGEMAP:项目-->1000
MERGEMAP:项目-->2000
N E X T --> 1000
N E X T --> 2000
MERGEMAP:项目-->3000
MERGEMAP:项目-->4000
N E X T --> 3000
N E X T --> 4000
MERGEMAP:项目-->5000
N E X T --> 5000
完成
我不明白为什么 func1 --- 40 出现在 func1 --- 30 之前,为什么 END 打印在 switchMap 的中间 -- -1--- 和 switchMap ---2---
谢谢

import { bufferCount, from, mergeMap, Observable, switchMap } from "rxjs";

module async1 {
  async function func1() {
    console.log("func1 --- 10");
    const loop0 = async () => {
      return from([0, 1, 2, 3, 4, 5, 6, 7]);
    };

    const loop1 = async () => {
      return 1;
    };

    const loop2 = async () => {
      const arrayLoop2 = [];
      for (let i = 0; i < 10; i++) {
        arrayLoop2.push(i + 0.111);
      }
      return arrayLoop2;
    };

    const loop3 = async () => {
      const arrayLoop3 = [1000, 2000, 3000, 4000, 5000];
      return from(arrayLoop3);
    };

    let myObservable: Observable<number>;

    console.log("func1 --- 20");
    loop0().then((value) => {
      myObservable = value;
      const myPipeline = myObservable!.pipe(
        switchMap((item) => {
          console.log(`switchMap ---1--- -->item: ${item}`);
          const loop1Result = from(loop1());
          return loop1Result;
        }),
        switchMap((item) => {
          console.log(`switchMap ---2--- -->item: ${item}`);
          const loop2Result = loop2();
          return loop2Result;
        }),
        switchMap((items) => {
          items.forEach((item) => {
            console.log(`switchMap ---3--- -->item: ${item}`);
          });
          const loop3Result = loop3();
          return loop3Result;
        }),
        switchMap((item) => {
          return item;
        }),
        bufferCount(2),
        mergeMap((items) => {
          items.forEach((item) => {
            console.log(`MERGEMAP: item-->${item}`);
          });
          return items;
        }, 3)
      );

      console.log("func1 --- 30");

      const mySubscription = myPipeline.subscribe({
        next: (item) => {
          console.log(`N E X T --> ${item}`);
        },
        error: () => {
          console.log(`E R R O R`);
        },
        complete: () => {
          console.log(`C O M P L E T E`);
          mySubscription.unsubscribe();
        },
      });
    });
    console.log("func1 --- 40");
    return true;
  }

  func1().then((resolve) => {
    console.log("***END***");
  });
}

您无法控制何时执行与可观察对象相关的代码,因此如果您想确保 func1() 中的整个逻辑在调用下一个 function 之前完成,您不应在 func1() 内使用可观察对象和订阅。您应该将 Promise<Type>await 结合使用。然后你可以这样调用 func1:

await func1();

如果你想在 RxJS 工作后执行函数 console.log("func1 --- 40")console.log("***END***"),你应该将它们添加到 complete 的主体中:

        complete: () => {
          console.log("func1 --- 40")
          console.log(`C O M P L E T E`);
          console.log("***END***")
          mySubscription.unsubscribe();
        }