RxJS 和 Angular HttpClient:使用 forkJoin 处理多个顺序请求
RxJS and Angular HttpClient: use of forkJoin for multiple sequential requests
在 RxJS (ES6) 中,我试图在单个 Observable 中的一系列连续操作中获得结果。
我不确定我是否必须使用 forkJoin(但我希望操作按顺序执行)或 concat 运算符(但我希望在所有操作结束时收到通知已执行)。
我试过了:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
连接
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
在这两种情况下,我都看不到来自 HttpClient
的可观察对象正在执行它们的工作(没有发送 http 请求),但我可以看到日志记录。
批处理中调用的方法如下:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
我调用 sync() 函数如下:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
输出:
(8) calling...
sync done
但没有 HTTP 请求开始。
如果我在管道图中的 forkJoin/concat 之外执行相同的操作,我可以看到发送的 HTTP 请求。
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
我错过了什么?
---更新-解决方案---
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
您的问题不在于 forkJoin/concat
,而是使用 map
到 return 另一个可观察值。
在您的代码中,map
会 return Observable<Observable<any>>
。您的 IDE 应该突出显示这些东西(不要使用 any
,可能这里有问题)。
您的解决方案是将 map
更改为 concatMap
(至少用于测试),然后下一个 chain 应该可以工作。
此外,调试 subscribe
及其 return 值以查看您拥有的内容。
尝试
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
这是假设 this.db.getProducts()
是同步静态数据而不是 observable/asynchronous 数据。
那你可以试试
this.productService.sync().subscribe(() => { console.log('sync done'); });
并查看是否有任何 API 调用。
在 RxJS (ES6) 中,我试图在单个 Observable 中的一系列连续操作中获得结果。
我不确定我是否必须使用 forkJoin(但我希望操作按顺序执行)或 concat 运算符(但我希望在所有操作结束时收到通知已执行)。
我试过了:
forkJoin
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return forkJoin(batch);
})
);
}
连接
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
return concat(batch);
})
);
}
在这两种情况下,我都看不到来自 HttpClient
的可观察对象正在执行它们的工作(没有发送 http 请求),但我可以看到日志记录。
批处理中调用的方法如下:
updateProduct(product: Product) {
console.log('calling...');
const options = this.getDefaultRequestOptions();
return this.http.post('update_product', product, options);
}
我调用 sync() 函数如下:
this.productService.sync().subscribe(() => {
console.log('sync done');
}, error => this.utils.handleError(error));
输出:
(8) calling...
sync done
但没有 HTTP 请求开始。
如果我在管道图中的 forkJoin/concat 之外执行相同的操作,我可以看到发送的 HTTP 请求。
sync(): Observable<any> {
const product = new Product(null, 'Title', 'Desc.', 'CODE');
return this.api.updateProduct(product);
}
我错过了什么?
---更新-解决方案---
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
flatMap(products => {
if ( !products ) {
return of(true);
}
const batch: Observable<any>[] = [];
for ( const product of products ) {
if ( product.toBeSync ) {
batch.push(this.api.updateProduct(product));
}
}
console.log(batch);
return concat(...batch);
// return forkJoin(batch);
})
);
您的问题不在于 forkJoin/concat
,而是使用 map
到 return 另一个可观察值。
在您的代码中,map
会 return Observable<Observable<any>>
。您的 IDE 应该突出显示这些东西(不要使用 any
,可能这里有问题)。
您的解决方案是将 map
更改为 concatMap
(至少用于测试),然后下一个 chain 应该可以工作。
此外,调试 subscribe
及其 return 值以查看您拥有的内容。
尝试
sync(): Observable<any> {
return from(this.db.getProducts()).pipe(
map(products => {
const batch: Observable<any>[] = [];
for (const product of products) {
batch.push(this.api.updateProduct(product));
}
return batch;
}),
switchMap(batch => forkJoin(batch)),
)
}
这是假设 this.db.getProducts()
是同步静态数据而不是 observable/asynchronous 数据。
那你可以试试
this.productService.sync().subscribe(() => { console.log('sync done'); });
并查看是否有任何 API 调用。