sequencing/queuing 使用 rxjs 的 http 请求批处理

sequencing/queuing http request batches with rxjs

我正在尝试弄清楚如何使用 RxJs 对传出 api 请求批次进行排队。

我在 this sample angular app 上的目标是同时发出 3 个“random-cat-facts”API 请求,只有在它们解决后才发出 2 个额外的“random-activity”请求:

import { Component } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { forkJoin, concat } from "rxjs";
import { combineAll } from "rxjs/operators";

const RANDOM_CAT_FACTS_API = "https://catfact.ninja/fact";
const RANDOM_ACTIVITY_API = "https://www.boredapi.com/api/activity";

@Component({
  selector: "app-root",
  templateUrl: "./app.component.html",
  styleUrls: ["./app.component.css"]
})
export class AppComponent {
  constructor(private http: HttpClient) {}

  fetchSequential() {
    const catFactsObservables = [
      this.http.get(RANDOM_CAT_FACTS_API),
      this.http.get(RANDOM_CAT_FACTS_API),
      this.http.get(RANDOM_CAT_FACTS_API)
    ];

    const activitiesObservables = [
      this.http.get(RANDOM_ACTIVITY_API),
      this.http.get(RANDOM_ACTIVITY_API)
    ];

    const sequencedObservables = [
      forkJoin(catFactsObservables),
      forkJoin(activitiesObservables)
    ];

    concat(sequencedObservables)
      .pipe(combineAll())
      .subscribe((p) => console.log({ p }));
  }
}

我显然做错了什么,因为目前所有 5 个请求都是同时发出的。

如何使用 RxJs 以便仅在 catFactsObservables 请求得到解决后发送 activitiesObservables 请求?

此外,有没有办法让 subscribe 回调仅在两个批次都已解决后触发 - 从而包含所有 5 个请求的响应而不是为每个批次触发单独的事件?

沙盒:

https://codesandbox.io/s/mystifying-gauss-cfcj0z?file=/src/app/app.component.ts:700-721

您要做的是使用 concatAll 运算符而不是 combineAll 运算符。

这样,第二个 Observable 将不会在第一个 Observable 之前发出。

您的代码如下所示:

concat(sequencedObservables)
      .pipe(concatAll())
      .subscribe((p) => console.log({ p }));

最后,我根据@gil 的回答和 使用 concatAllreduce 的组合解决了这个问题。

    concat(sequencedObservables)
      .pipe(
        concatAll(),
        reduce((acc, res) => {
          acc.push(res);
          return acc;
        }, [])
      )
      .subscribe((p) => console.log(p));