如何使用 RxJS 控制多个 ajax 调用的压力
How to control pressure of multiple ajax calls with RxJS
使用 RxJS 5 我想解决以下问题:
假设我正在从 REST API 中获取类别列表。
基于其中的每一个类别,我想从另一个 REST 端点获取子类别。
然后,基于这些子类别中的每一个,我想获取产品,对于每一个产品,我们需要获取详细描述。
这个我已经解决了。问题是 ajax 调用升级,不到一分钟就发出了超过 30k 次调用,导致服务器崩溃。
现在,由于这是一项夜间工作,只要它成功完成,我可以接受它需要一些时间。
这是我的:
getCategories() // Wraps ajax call and returns payload with array of categories
.switchMap(categories => Observable.from(categories))
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
})
)
.mergeMap(subCategories => Observable.from(subCategories))
.mergeMap(subCategory =>
getProducts(subCategory) // Wraps ajax call and returns payload with array of products
.catch(err => {
console.error('Error when fetching products for sub category:', subCategory);
console.error(err);
return Observable.empty();
})
)
.mergeMap(products => Observable.from(products))
.mergeMap(product =>
getProductDetails(product) // Wraps ajax call and returns payload with product details
.catch(err => {
console.error('Error when fetching product details for product:', product);
console.error(err);
return Observable.empty();
})
)
.mergeMap(productDetails => saveToDb(productDetails))
.catch(err => {
console.error(err);
})
.subscribe();
在我获取类别的初始请求之后,我想要:
每次获取子类别的调用都应等到前一个完成。获取产品和这些产品详细信息时,一次只能调用 5 ajax 次。在这 5 个调用完成后,我们触发接下来的 5 个调用等等。
或者,它可以通过时间来控制,比如在我们进行下一个 ajax 调用之前等待 x 秒等
根据我上面的示例,我将如何使用 RxJS 很好地解决这个问题?
mergeMap
有一个带有可选并发参数的重载。您可以利用它来控制有多少请求同时发送到您的服务器。
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
}),
null,
5 /* concurrency */
)
还有; mergeMap 会将任何 observableLike 转换为 Observables,因此如果您的 getSubCategories()
returns 一个 Array 将自动转换为 observable,则不需要进一步 Observable.from()
。
使用 RxJS 5 我想解决以下问题:
假设我正在从 REST API 中获取类别列表。
基于其中的每一个类别,我想从另一个 REST 端点获取子类别。
然后,基于这些子类别中的每一个,我想获取产品,对于每一个产品,我们需要获取详细描述。
这个我已经解决了。问题是 ajax 调用升级,不到一分钟就发出了超过 30k 次调用,导致服务器崩溃。
现在,由于这是一项夜间工作,只要它成功完成,我可以接受它需要一些时间。
这是我的:
getCategories() // Wraps ajax call and returns payload with array of categories
.switchMap(categories => Observable.from(categories))
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
})
)
.mergeMap(subCategories => Observable.from(subCategories))
.mergeMap(subCategory =>
getProducts(subCategory) // Wraps ajax call and returns payload with array of products
.catch(err => {
console.error('Error when fetching products for sub category:', subCategory);
console.error(err);
return Observable.empty();
})
)
.mergeMap(products => Observable.from(products))
.mergeMap(product =>
getProductDetails(product) // Wraps ajax call and returns payload with product details
.catch(err => {
console.error('Error when fetching product details for product:', product);
console.error(err);
return Observable.empty();
})
)
.mergeMap(productDetails => saveToDb(productDetails))
.catch(err => {
console.error(err);
})
.subscribe();
在我获取类别的初始请求之后,我想要:
每次获取子类别的调用都应等到前一个完成。获取产品和这些产品详细信息时,一次只能调用 5 ajax 次。在这 5 个调用完成后,我们触发接下来的 5 个调用等等。
或者,它可以通过时间来控制,比如在我们进行下一个 ajax 调用之前等待 x 秒等
根据我上面的示例,我将如何使用 RxJS 很好地解决这个问题?
mergeMap
有一个带有可选并发参数的重载。您可以利用它来控制有多少请求同时发送到您的服务器。
public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable
.mergeMap(category =>
getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
.catch(err => {
console.error('Error when fetching sub categories for category:', category);
console.error(err);
return Observable.empty();
}),
null,
5 /* concurrency */
)
还有; mergeMap 会将任何 observableLike 转换为 Observables,因此如果您的 getSubCategories()
returns 一个 Array 将自动转换为 observable,则不需要进一步 Observable.from()
。