如何在 RxJs Observable 构造函数中使用异步调用

How to use an async calling inside of an RxJs Observable constructor

我正在使用 NgRx 在应用程序中设置 effect

在效果期间,我需要获取一些存储值,做一些逻辑,然后根据我的逻辑调度一个或多个动作 运行.

所以,为了更容易制作,我正在使用 dispatch: false 的效果。

public getCachedOrServerValue$ = createEffect(() =>
    this.actions$.pipe(
      ofType(myActions.getCachedOrServerValue),
      switchMap((_) => this.getCachedOrServerValue())
    ), { dispatch: false }
);

所以在上面我会调用getCachedOrServerValue。我想使用 switchMap(而不仅仅是 tap)所以如果我在前一个完成之前得到一个新的 myActions.getCachedOrServerValue,它将取消之前的订阅(根据我对 switchMap 的理解) .

我们调用的函数如下所示:

private getCachedOrServerValue(): Observable<void> {
  const result$ = new Observable<void>(async (subscriber) => { // <---- I need this to be async so I can use await
    // First see if we can get it from the store
    const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
    if (cached) {
      this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
      subscriber.next();
      return;
    }

    // Need to request from the server
    const name = await this.storeUtils.getStoreValue(fromApp.getSiteName);

    // storeUtils.getStoreValue is just a helper we have to get a single value as a Promise
    const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
    const serverValue = await this.myService.getValue(namevalueId);
    this.store$.dispatch(myActions.storeValue(serverValue));
    this.store$.dispatch(myActions.getCachedOrServerValueSuccess(cached));
  });
  return result$;
}

首先,我封装了一个 observable,据我所知,我们需要一个 observable 才能在 switchMap.

中使用此调用

其次(我的实际问题),我希望能够在 Observable subscriber 构造函数参数中使用 await,但是 VSCode 报错。

错误详情:

Argument of type '(this: Observable, subscriber: Subscriber) => Promise' is not assignable to parameter of type '(this: Observable, subscriber: Subscriber) => TeardownLogic'. Type 'Promise' is not assignable to type 'TeardownLogic'. Property 'unsubscribe' is missing in type 'Promise' but required in type 'Unsubscribable'.ts(2345)

types.d.ts(31, 5): 'unsubscribe' 在这里声明。

这里有什么问题,如何使用如上所示的 aysnc/await

更新

根据@StPaulis 的回答,我可以避免这样做(至少在这种情况下)。

我可以像下面那样做(还没有测试,但肯定没问题):

public getCachedOrServerValue$ = createEffect(() =>
  this.actions$.pipe(
    ofType(myActions.getCachedOrServerValue),
    switchMap(_ => from(this.getCachedOrServerValue()).pipe( // <-- use a from here 
         map(data => myActions.getCachedOrServerValueSuccess(data)),
         catchError(error => of(myActions.getCachedOrServerValueFail(error)))
))));

private async getCachedOrServerValue(): Promise<DataState>{
 // First see if we can get it from the store
 const cached = await this.storeUtils.getStoreValue(fromApp.getValue);
 if (cached) {
    return cached;
 }

 // Need to request from the server
 const name = await this.storeUtils.getStoreValue(fromApp.getName);
 const valueId = await this.storeUtils.getStoreValue(fromApp.getValueId);
 const serverValue = await this.myService.getValue(name, valueId);
 return serverValue;
}

在我看来,将 observables 与 promises 混合从来都不是一个好主意。

我建议您使用 RxJs | from 将您的承诺转换为一个可观察对象并在您的管道中使用它,而不是创建一个新的可观察对象并将所有逻辑包装在里面。

RxJs 旨在取代(异步 | 等待)功能。