Angular - Promise to Observable (RxJS) - 对单个端点的重复 API 调用

Angular - Promise to Observable (RxJS) - repetitive API call to single endpoint

我在 Angular 工作时遇到问题,我正在寻找使用 Observables 的解决方案,而不是我现在使用的解决方案,但使用 Promises(异步/等待)。

端点接受排序和分页参数,即 pageSize 和 page,以检索数据。如果您不发送任何参数,它会以允许的最大项目数(当前为 1000)进行响应。但是项目的总数可以是数万个,也可以低至几个。 API 响应始终如下所示:

{
  items: [...], // (array of objects)
  totalNumberOfItems: 123 // (total number of database entries)
}

目前,我有这样的东西(我省略了 try-catch 和不必要的东西),有承诺:

async getAllItems(): Promise<any[]> {
  let page: number = 1;
  let pageSize: number;
  let items: any[] = [];
  let totalNumberOfItems: number;

  // Initial httpClient call that returns an Observable by default
  let initialResponse = await this.someService.getItems().toPromise();
  items = [...items, ...initialResponse.items];
  pageSize = initialResponse.items.length;
  totalNumberOfItems = initialResponse.totalNumberOfItems;

  // Loop until we get them all
  while (initialResponse.items.length < initialResponse.totalNumberOfItems) {
    page++;
    let nextResponse = await this.someService.getItems({ page, pageSize }).toPromise();
    items = [...items, ...nextResponse.items];
  }

  return items;
}

(或者类似的东西,我凭记忆打字。)

基本上,我不知道如何从最初的 API 调用中正确地进行管道传输(呵呵)。像这样的想法:

interface IResponse {
  items: any[];
  totalNumberOfItems: number;
}

getAllItems(): Observable<IResponse> {
  let page: number = 1;
  let pageSize: number;
  let totalNumberOfItems: number;

  return this.someService.getItems().pipe(
    tap(res => {
      pageSize = res.items.length;
      totalNumberOfItems = res.totalNumberOfItems;
    }),
    // TODO: Now what?
    // Other call using the same getItems() but with parameters, making it iterative
    // and adding it to the result of the initial request?
  );
}

编辑: 基于 by thisdotutkarsh,我已经这样做了,它按预期工作(它可能会更精致,但仍然有效):

getAllItems(): Observable<ItemsResponse> {
  let page: number = 1;
  let pageSize: number;
  let totalNumberOfItems: number;

  return this.service.getItems().pipe(
    tap(response => {
      pageSize = response.items.length;
      totalNumberOfItems = response.totalNumberOfItems;
    }),
    expand(response => {
      if (response.items.length < totalNumberOfItems) {
        const numRepeats = Math.ceil(totalNumberOfItems / pageSize);
        return (page < numRepeats) ? this.service.getItems({ page++, pageSize }) : EMPTY;
      } else {
        return EMPTY;
      }
    }),
    reduce((acc, response) => {
      acc.Items = [...acc.Items, ...response.Items];
      return acc;
    })
  );
}

您可以使用 expandreduce RxJS 运算符实现预期的行为。

expand 运算符递归地将每个源值投影到一个 Observable,然后将其合并到输出 Observable 中。而 reduce 运算符在源 Observable 上应用累加器函数,并且 returns 源完成时的累加结果。

getAllItems() {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;

return this.service.getItems().pipe(
  tap(response => {
    totalNumberOfItems = response.totalNumberOfItems; /* The tap operator performs side-effect once per subscribe */
  }),
  expand((response) => {
    pageSize += response.items.length ? response.items.length : 0;

    return pageSize <= totalNumberOfItems ? this.service.getItems(page++, pageSize) : Observable.empty();
  }),
  reduce((acc, response) => {
    return acc.concat(response.items);
  }, [])
  .catch(error => console.log(error))
  .subscribe((iResponse)) => {
    ...
  });
});
}