使用rxjsAngular和ngrx同步处理Observable防止过多HTTP请求超时
Synchronously process Observable to prevent too many HTTP requests from timing out using rxjs Angular and ngrx
我有以下代码。它获取一个患者数组并构建一个行对象 this.rows
,我在 angular 4 组件前端的 table 中显示该对象(我也在使用 rxjs 5.5)。
我的问题是每行的 hasAlert 属性 都是通过调用 hasAlerts() 分配的。在 HasAlerts 方法中,我通过 this.dataService.fetchItems<Observation>(
为每位患者发出了一个 http 请求。
当患者较多时,会异步发生过多的HTTP请求,从HasAlerts()开始失败(超时)。有没有办法限制这种情况,或者一次从 hasAlerts() 处理一个 Observable?
以下是解决此问题的可能方法
- 我认为 concatMap 可能会有帮助,但我不确定如何使用它。还应该使用它来处理每个 patient/row 还是应该尝试将每个 hasAlert() observable 聚合到一个列表中,然后尝试使用 concatMap 一次处理一个?
- 一个变通方法是我在注释中添加的 patientsAlertsProcessed 变量包装 // 已完成以限制 hasAlertsMethod。我将其限制为仅命中该方法 15 次以避免超时问题。这并不理想,因为一旦这 15 个异步请求完成,我就无法再次启动它。
代码如下
ngOnInit(): void {
this.store.dispatch(new patients.Load([]));
this.patients$ = this.store.select(fromPatients.getAll);
var patientsAlertsProcessed =0;
this.patients$.debounceTime(2000).map(p =>{//Emits Observable<Patient[]>s, the debounceTime is to take the last one as it builds up
this.rows = p.map(pat => {// p = Patient[], iterate through the array of patients
var observations=0;
var rowX= {
username: pat.username,
id: pat.id,
hasAlert:false,
};
if (patientsAlertsProcessed<15){// this is done to throttle the HasAlerts Method
this.hasAlerts(pat).do(x => {
observations++;
if (observations>0)
{
rowX.hasAlert=true;
}
}).subscribe();
patientsAlertsProcessed++;
}
return rowX;
});
}).subscribe(
()=> { },
()=> {
this.table.recalculatePages();
}
);
}
hasAlerts(pat: Patient): Observable<Observation> {
var obs$= this.dataService.fetchItems<Observation>(// this is making an HTTP get request
"Observation",
null,
pat.id
).filter(function (x){
if (x.category.coding[0].code == "GlucoseEvent"){
return true;
}
else{
return false;
}
}
);
return obs$;
}
您可能想按照这些思路尝试一些东西。
const concurrency = 10;
const rowsWithAlerts = [];
this.patients$.debounceTime(2000)
.switchMap(patients => Observable.from(patients)) // patients is of type Patient[]
.mergeMap(patient => {
rowsWithAlerts.push(patient);
this.hasAlerts(patient).do(
hasAlertsResult => {
// here you have hold to both the patient and the result of
// hasAlerts call
patient.hasAlerts = hasAlertsResult;
}}), concurrency)
.subscribe(
() => this.rows = rowsWithAlerts;
)
这里的关键是使用 mergeMap
运算符并将并发级别设置为一个值,在此示例中为 10,但显然可以是任何值。
这允许限制您同时订阅的可观察对象的数量,在您的情况下意味着限制您进行的 http 调用的数量。
我有以下代码。它获取一个患者数组并构建一个行对象 this.rows
,我在 angular 4 组件前端的 table 中显示该对象(我也在使用 rxjs 5.5)。
我的问题是每行的 hasAlert 属性 都是通过调用 hasAlerts() 分配的。在 HasAlerts 方法中,我通过 this.dataService.fetchItems<Observation>(
为每位患者发出了一个 http 请求。
当患者较多时,会异步发生过多的HTTP请求,从HasAlerts()开始失败(超时)。有没有办法限制这种情况,或者一次从 hasAlerts() 处理一个 Observable?
以下是解决此问题的可能方法
- 我认为 concatMap 可能会有帮助,但我不确定如何使用它。还应该使用它来处理每个 patient/row 还是应该尝试将每个 hasAlert() observable 聚合到一个列表中,然后尝试使用 concatMap 一次处理一个?
- 一个变通方法是我在注释中添加的 patientsAlertsProcessed 变量包装 // 已完成以限制 hasAlertsMethod。我将其限制为仅命中该方法 15 次以避免超时问题。这并不理想,因为一旦这 15 个异步请求完成,我就无法再次启动它。
代码如下
ngOnInit(): void {
this.store.dispatch(new patients.Load([]));
this.patients$ = this.store.select(fromPatients.getAll);
var patientsAlertsProcessed =0;
this.patients$.debounceTime(2000).map(p =>{//Emits Observable<Patient[]>s, the debounceTime is to take the last one as it builds up
this.rows = p.map(pat => {// p = Patient[], iterate through the array of patients
var observations=0;
var rowX= {
username: pat.username,
id: pat.id,
hasAlert:false,
};
if (patientsAlertsProcessed<15){// this is done to throttle the HasAlerts Method
this.hasAlerts(pat).do(x => {
observations++;
if (observations>0)
{
rowX.hasAlert=true;
}
}).subscribe();
patientsAlertsProcessed++;
}
return rowX;
});
}).subscribe(
()=> { },
()=> {
this.table.recalculatePages();
}
);
}
hasAlerts(pat: Patient): Observable<Observation> {
var obs$= this.dataService.fetchItems<Observation>(// this is making an HTTP get request
"Observation",
null,
pat.id
).filter(function (x){
if (x.category.coding[0].code == "GlucoseEvent"){
return true;
}
else{
return false;
}
}
);
return obs$;
}
您可能想按照这些思路尝试一些东西。
const concurrency = 10;
const rowsWithAlerts = [];
this.patients$.debounceTime(2000)
.switchMap(patients => Observable.from(patients)) // patients is of type Patient[]
.mergeMap(patient => {
rowsWithAlerts.push(patient);
this.hasAlerts(patient).do(
hasAlertsResult => {
// here you have hold to both the patient and the result of
// hasAlerts call
patient.hasAlerts = hasAlertsResult;
}}), concurrency)
.subscribe(
() => this.rows = rowsWithAlerts;
)
这里的关键是使用 mergeMap
运算符并将并发级别设置为一个值,在此示例中为 10,但显然可以是任何值。
这允许限制您同时订阅的可观察对象的数量,在您的情况下意味着限制您进行的 http 调用的数量。