如何处理在 RxJs observable 中使用 async/awaits 引起的竞争条件

How to deal with a race condition caused by using async/awaits within an RxJs observable

我有一个 Ionic/Angular 应用程序,我在其中使用 NgRx 进行状态管理。当我加载我的数据时,我设置了各种状态标志,然后有一个名为 isLoading 的选择器,我只是在其中“和”每个加载标志。这部分似乎工作正常。

但是,我的loading indicator show和hide实际上是呈现了两种async方法。

我有下面的代码,为了show/hide这个...

 this.subs.sink = this.store$.select(fromApp.isLoading)
    .pipe(switchMap(async isLoading => {
      this.logger.info(`is loading: ${isLoading}`);
      if (isLoading) {
        await this.presentDataLoading();
        this.logger.info(`is loading presented`);
        return of(isLoading);
      } else {
        await this.dismissLoaderIfPresent();
        this.logger.info(`is loading dismissed`);
        return of(isLoading);
      }          
    })).subscribe();

这两种方法看起来像...

protected async presentDataLoading(message? : string): Promise<void> {
        if (this.loaderBase != undefined || this.isLoading) {
            return;
        }

        this.isLoading = true;
        let loaderMessage = message == undefined ? this.translateBase.instant(ResourceKeys.vals.messageLoadingData) : message;
        this.loaderBase = await this.userMessageServiceBase.presentLoadingSpinner(loaderMessage);
        this.isLoading = false;
    }


    protected async dismissLoaderIfPresent() : Promise<void> {
        if (this.loaderBase == undefined) {
            return;
        }

        await this.loaderBase.dismiss().catch(error => this.loggerBase.error(`loaderBase.dismiss: ${error}`));
        this.loaderBase = undefined;
    }

所以上面我已经使用 switchMap 来尝试解决我的竞争条件,但似乎没有帮助。

登出结果如下

    9:43:25 is loading: undefined
    9:43:25 is loading: true
    9:43:25 is loading dismissed
    9:43:25 is loading presented
    9:43:25 is loading: undefined
    9:43:25 is loading dismissed
    9:43:30 is loading: undefined
    9:43:30 is loading: true
    9:43:30 is loading dismissed
    9:43:30 is loading: undefined  <--- last emitted val is now false. Correct
    9:43:30 is loading dismissed
    9:43:31 is loading presented   <--- But I now have this just being called

所以我们可以看到要发射的 isLoading 的最后一个值是未定义的(这是正确的)。

但是我们看到日志语句 loading presented 是最后一个完成的,所以这是错误的,我有一个卡住的微调器。

所以我真的需要“锁定”“switchMap”的内容,直到承诺解决,但我不确定为什么 switchMap 不这样做,因为我没有返回可观察的(即通过 return of(isLoading); 直到等待之后。

我该如何处理这种情况?

更新

记下下面的答案,现在使用 concatMap(因此不会取消可观察值)并使用 from 将承诺转换为可观察值,我现在有以下...

 this.subs.sink = this.store$.select(fromApp.isLoading)
    .pipe(concatMap(isLoading => {    
      this.logger.info(`is loading: ${isLoading}`);
      if (isLoading) {
        const obs =  from(this.presentDataLoading());
        this.logger.info(`is loading presented`);
        return obs;
      } else {
        const obs = from(this.dismissLoaderIfPresent());
        this.logger.info(`is loading dismissed`);
        return obs;
      }          
    })).subscribe();

到目前为止,这似乎符合我的要求 - 日志输出

6:54:58 is loading: undefined
6:54:58 is loading dismissed
6:54:58 is loading: true
6:54:58 is loading presented
6:54:58 is loading: undefined
6:54:58 is loading dismissed

一个简单的解决方案

承诺是急切的,所以他们会 运行 即使没有人对结果感兴趣。在将它交给 switchMap 进行管理之前,您的承诺是 运行ning。此外,switchMap 可能无法按预期使用 promises,因为它们无法取消。也许改用 concatMap。

用最小的改变来解决这个问题的方法是推迟你的承诺,所以直到 RxJS 订阅它才会开始。

this.subs.sink = this.store$.select(
  fromApp.isLoading
).pipe(
  concatMap(defer(() => async isLoading => {
    /* Same code as before */        
  }))
).subscribe();

如果您想避免所有这些边缘情况,请停止交错 PromisesObservables,只选择一个。它们确实可以互操作,但它们在设计时并未考虑到这一点。 Observable 是惰性的,可以取消,而 promises 是热切的,不能取消。

如果您曾经在软件商店工作,他们可能会阻止或禁止这种代码风格。这两个异步库之间的切换应保持在最小限度,并在程序的边界内进行 modules/libraries.