return 来自单个 Observable 的同步和异步值的混合

return mix of sync and async values from single Observable

我有一个已获取对象的缓存。我的函数 getListByIds 应该 return Observable<Model>。在这个函数中,我需要检查缓存中是否存在对象(通过 id 检查),如果不存在,则从后端获取它。使用 async/await 的代码可以是:

public async getListByIds(ids: number[]): Promise<Model> {
  return ids.map(id => {
    let cached = this.cacheService.get(id); // may return undefined
    if (!cached) cached = await this.http.get(baseUrl + '/get/' + id).toPromise();
    return cached;
  });
}

我需要将其转换为 RxJS - to Promise() 已弃用,我想学习 RxJs。 我不想在里面使用 subscribe()

编辑

假设我想在缓存中存储现成的模型,并且 API 有需要解析的纯 JOSN 数据。

public async getListByIds(ids: number[]): Promise<Model> {
  return ids.map(id => {
    let cached = this.cacheService.get(id); // may return undefined
    if (!cached) {
      const apiData = await this.http.get(baseUrl + '/get/' + id).toPromise();
      cached = this.parseApiData(apiData);
    }
    return cached;
  });
}

我应该去哪里打电话 parseApiData()

该函数看起来像这样:

import { forkJoin } from "rxjs/observable/forkJoin";
import { of } from "rxjs/observable/of";

public getListByIds(ids: number[]): Observable<Model> {
  return forkJoin(ids.map(
    (id) => {
      let cached = this.cacheService.get(id)
      if (!cached) return this.http.get(baseUrl + '/get/' + id).pipe(
        map(apiData => this.parseApiData(apiData))
      )
      return of(cached)
    }
  )
}

forkJoin 等待所有映射的 Observable 完成。

这里有两种不同的方法,具体取决于您是否希望 observable 发出流数组。

// A Stream of Models
public getListByIds(ids: number[]): Observable<Model> {
  return from(ids).pipe(
    map(id =>  [id, this.cacheService.get(id)]),
    mergeMap(([id, cached]) => !!cached ? 
      of(cached) : 
      this.http.get(baseUrl + '/get/' + id)
    )
  );
}

// An Array of Models
public getListByIds(ids: number[]): Observable<Model[]> {
  return forkJoin(ids
    .map(id =>  [id, this.cacheService.get(id)])
    .map(([id, cached]) => !!cached ? 
      of(cached) : 
      this.http.get(baseUrl + '/get/' + id)
    )
  );
}

为了完整起见,您可以使用 toArray 将第一个转换为第二个。

// An Array of Models
public getListByIds(ids: number[]): Observable<Model[]> {
  return from(ids).pipe(
    map(id =>  [id, this.cacheService.get(id)]),
    mergeMap(([id, cached]) => !!cached ? 
      of(cached) : 
      this.http.get(baseUrl + '/get/' + id)
    ),
    toArray()
  );
}