Angular - Promise to Observable (RxJS) - 对单个端点的重复 API 调用
Angular - Promise to Observable (RxJS) - repetitive API call to single endpoint
我在 Angular 工作时遇到问题,我正在寻找使用 Observables 的解决方案,而不是我现在使用的解决方案,但使用 Promises(异步/等待)。
端点接受排序和分页参数,即 pageSize 和 page,以检索数据。如果您不发送任何参数,它会以允许的最大项目数(当前为 1000)进行响应。但是项目的总数可以是数万个,也可以低至几个。 API 响应始终如下所示:
{
items: [...], // (array of objects)
totalNumberOfItems: 123 // (total number of database entries)
}
目前,我有这样的东西(我省略了 try-catch 和不必要的东西),有承诺:
async getAllItems(): Promise<any[]> {
let page: number = 1;
let pageSize: number;
let items: any[] = [];
let totalNumberOfItems: number;
// Initial httpClient call that returns an Observable by default
let initialResponse = await this.someService.getItems().toPromise();
items = [...items, ...initialResponse.items];
pageSize = initialResponse.items.length;
totalNumberOfItems = initialResponse.totalNumberOfItems;
// Loop until we get them all
while (initialResponse.items.length < initialResponse.totalNumberOfItems) {
page++;
let nextResponse = await this.someService.getItems({ page, pageSize }).toPromise();
items = [...items, ...nextResponse.items];
}
return items;
}
(或者类似的东西,我凭记忆打字。)
基本上,我不知道如何从最初的 API 调用中正确地进行管道传输(呵呵)。像这样的想法:
interface IResponse {
items: any[];
totalNumberOfItems: number;
}
getAllItems(): Observable<IResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.someService.getItems().pipe(
tap(res => {
pageSize = res.items.length;
totalNumberOfItems = res.totalNumberOfItems;
}),
// TODO: Now what?
// Other call using the same getItems() but with parameters, making it iterative
// and adding it to the result of the initial request?
);
}
编辑:
基于 by thisdotutkarsh,我已经这样做了,它按预期工作(它可能会更精致,但仍然有效):
getAllItems(): Observable<ItemsResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
pageSize = response.items.length;
totalNumberOfItems = response.totalNumberOfItems;
}),
expand(response => {
if (response.items.length < totalNumberOfItems) {
const numRepeats = Math.ceil(totalNumberOfItems / pageSize);
return (page < numRepeats) ? this.service.getItems({ page++, pageSize }) : EMPTY;
} else {
return EMPTY;
}
}),
reduce((acc, response) => {
acc.Items = [...acc.Items, ...response.Items];
return acc;
})
);
}
您可以使用 expand
和 reduce
RxJS 运算符实现预期的行为。
expand
运算符递归地将每个源值投影到一个 Observable,然后将其合并到输出 Observable 中。而 reduce
运算符在源 Observable 上应用累加器函数,并且 returns 源完成时的累加结果。
getAllItems() {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
totalNumberOfItems = response.totalNumberOfItems; /* The tap operator performs side-effect once per subscribe */
}),
expand((response) => {
pageSize += response.items.length ? response.items.length : 0;
return pageSize <= totalNumberOfItems ? this.service.getItems(page++, pageSize) : Observable.empty();
}),
reduce((acc, response) => {
return acc.concat(response.items);
}, [])
.catch(error => console.log(error))
.subscribe((iResponse)) => {
...
});
});
}
我在 Angular 工作时遇到问题,我正在寻找使用 Observables 的解决方案,而不是我现在使用的解决方案,但使用 Promises(异步/等待)。
端点接受排序和分页参数,即 pageSize 和 page,以检索数据。如果您不发送任何参数,它会以允许的最大项目数(当前为 1000)进行响应。但是项目的总数可以是数万个,也可以低至几个。 API 响应始终如下所示:
{
items: [...], // (array of objects)
totalNumberOfItems: 123 // (total number of database entries)
}
目前,我有这样的东西(我省略了 try-catch 和不必要的东西),有承诺:
async getAllItems(): Promise<any[]> {
let page: number = 1;
let pageSize: number;
let items: any[] = [];
let totalNumberOfItems: number;
// Initial httpClient call that returns an Observable by default
let initialResponse = await this.someService.getItems().toPromise();
items = [...items, ...initialResponse.items];
pageSize = initialResponse.items.length;
totalNumberOfItems = initialResponse.totalNumberOfItems;
// Loop until we get them all
while (initialResponse.items.length < initialResponse.totalNumberOfItems) {
page++;
let nextResponse = await this.someService.getItems({ page, pageSize }).toPromise();
items = [...items, ...nextResponse.items];
}
return items;
}
(或者类似的东西,我凭记忆打字。)
基本上,我不知道如何从最初的 API 调用中正确地进行管道传输(呵呵)。像这样的想法:
interface IResponse {
items: any[];
totalNumberOfItems: number;
}
getAllItems(): Observable<IResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.someService.getItems().pipe(
tap(res => {
pageSize = res.items.length;
totalNumberOfItems = res.totalNumberOfItems;
}),
// TODO: Now what?
// Other call using the same getItems() but with parameters, making it iterative
// and adding it to the result of the initial request?
);
}
编辑:
基于
getAllItems(): Observable<ItemsResponse> {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
pageSize = response.items.length;
totalNumberOfItems = response.totalNumberOfItems;
}),
expand(response => {
if (response.items.length < totalNumberOfItems) {
const numRepeats = Math.ceil(totalNumberOfItems / pageSize);
return (page < numRepeats) ? this.service.getItems({ page++, pageSize }) : EMPTY;
} else {
return EMPTY;
}
}),
reduce((acc, response) => {
acc.Items = [...acc.Items, ...response.Items];
return acc;
})
);
}
您可以使用 expand
和 reduce
RxJS 运算符实现预期的行为。
expand
运算符递归地将每个源值投影到一个 Observable,然后将其合并到输出 Observable 中。而 reduce
运算符在源 Observable 上应用累加器函数,并且 returns 源完成时的累加结果。
getAllItems() {
let page: number = 1;
let pageSize: number;
let totalNumberOfItems: number;
return this.service.getItems().pipe(
tap(response => {
totalNumberOfItems = response.totalNumberOfItems; /* The tap operator performs side-effect once per subscribe */
}),
expand((response) => {
pageSize += response.items.length ? response.items.length : 0;
return pageSize <= totalNumberOfItems ? this.service.getItems(page++, pageSize) : Observable.empty();
}),
reduce((acc, response) => {
return acc.concat(response.items);
}, [])
.catch(error => console.log(error))
.subscribe((iResponse)) => {
...
});
});
}