sequencing/queuing 使用 rxjs 的 http 请求批处理
sequencing/queuing http request batches with rxjs
我正在尝试弄清楚如何使用 RxJs 对传出 api 请求批次进行排队。
我在 this sample angular app 上的目标是同时发出 3 个“random-cat-facts”API 请求,只有在它们解决后才发出 2 个额外的“random-activity”请求:
import { Component } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { forkJoin, concat } from "rxjs";
import { combineAll } from "rxjs/operators";
const RANDOM_CAT_FACTS_API = "https://catfact.ninja/fact";
const RANDOM_ACTIVITY_API = "https://www.boredapi.com/api/activity";
@Component({
selector: "app-root",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent {
constructor(private http: HttpClient) {}
fetchSequential() {
const catFactsObservables = [
this.http.get(RANDOM_CAT_FACTS_API),
this.http.get(RANDOM_CAT_FACTS_API),
this.http.get(RANDOM_CAT_FACTS_API)
];
const activitiesObservables = [
this.http.get(RANDOM_ACTIVITY_API),
this.http.get(RANDOM_ACTIVITY_API)
];
const sequencedObservables = [
forkJoin(catFactsObservables),
forkJoin(activitiesObservables)
];
concat(sequencedObservables)
.pipe(combineAll())
.subscribe((p) => console.log({ p }));
}
}
我显然做错了什么,因为目前所有 5 个请求都是同时发出的。
如何使用 RxJs 以便仅在 catFactsObservables
请求得到解决后发送 activitiesObservables
请求?
此外,有没有办法让 subscribe
回调仅在两个批次都已解决后触发 - 从而包含所有 5 个请求的响应而不是为每个批次触发单独的事件?
沙盒:
https://codesandbox.io/s/mystifying-gauss-cfcj0z?file=/src/app/app.component.ts:700-721
您要做的是使用 concatAll 运算符而不是 combineAll 运算符。
这样,第二个 Observable 将不会在第一个 Observable 之前发出。
您的代码如下所示:
concat(sequencedObservables)
.pipe(concatAll())
.subscribe((p) => console.log({ p }));
最后,我根据@gil 的回答和 使用 concatAll
和 reduce
的组合解决了这个问题。
concat(sequencedObservables)
.pipe(
concatAll(),
reduce((acc, res) => {
acc.push(res);
return acc;
}, [])
)
.subscribe((p) => console.log(p));
我正在尝试弄清楚如何使用 RxJs 对传出 api 请求批次进行排队。
我在 this sample angular app 上的目标是同时发出 3 个“random-cat-facts”API 请求,只有在它们解决后才发出 2 个额外的“random-activity”请求:
import { Component } from "@angular/core";
import { HttpClient } from "@angular/common/http";
import { forkJoin, concat } from "rxjs";
import { combineAll } from "rxjs/operators";
const RANDOM_CAT_FACTS_API = "https://catfact.ninja/fact";
const RANDOM_ACTIVITY_API = "https://www.boredapi.com/api/activity";
@Component({
selector: "app-root",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent {
constructor(private http: HttpClient) {}
fetchSequential() {
const catFactsObservables = [
this.http.get(RANDOM_CAT_FACTS_API),
this.http.get(RANDOM_CAT_FACTS_API),
this.http.get(RANDOM_CAT_FACTS_API)
];
const activitiesObservables = [
this.http.get(RANDOM_ACTIVITY_API),
this.http.get(RANDOM_ACTIVITY_API)
];
const sequencedObservables = [
forkJoin(catFactsObservables),
forkJoin(activitiesObservables)
];
concat(sequencedObservables)
.pipe(combineAll())
.subscribe((p) => console.log({ p }));
}
}
我显然做错了什么,因为目前所有 5 个请求都是同时发出的。
如何使用 RxJs 以便仅在 catFactsObservables
请求得到解决后发送 activitiesObservables
请求?
此外,有没有办法让 subscribe
回调仅在两个批次都已解决后触发 - 从而包含所有 5 个请求的响应而不是为每个批次触发单独的事件?
沙盒:
https://codesandbox.io/s/mystifying-gauss-cfcj0z?file=/src/app/app.component.ts:700-721
您要做的是使用 concatAll 运算符而不是 combineAll 运算符。
这样,第二个 Observable 将不会在第一个 Observable 之前发出。
您的代码如下所示:
concat(sequencedObservables)
.pipe(concatAll())
.subscribe((p) => console.log({ p }));
最后,我根据@gil 的回答和 concatAll
和 reduce
的组合解决了这个问题。
concat(sequencedObservables)
.pipe(
concatAll(),
reduce((acc, res) => {
acc.push(res);
return acc;
}, [])
)
.subscribe((p) => console.log(p));