使用rxjsAngular和ngrx同步处理Observable防止过多HTTP请求超时

Synchronously process Observable to prevent too many HTTP requests from timing out using rxjs Angular and ngrx

我有以下代码。它获取一个患者数组并构建一个行对象 this.rows,我在 angular 4 组件前端的 table 中显示该对象(我也在使用 rxjs 5.5)。

我的问题是每行的 hasAlert 属性 都是通过调用 hasAlerts() 分配的。在 HasAlerts 方法中,我通过 this.dataService.fetchItems<Observation>( 为每位患者发出了一个 http 请求。

当患者较多时,会异步发生过多的HTTP请求,从HasAlerts()开始失败(超时)。有没有办法限制这种情况,或者一次从 hasAlerts() 处理一个 Observable?

以下是解决此问题的可能方法

  1. 我认为 concatMap 可能会有帮助,但我不确定如何使用它。还应该使用它来处理每个 patient/row 还是应该尝试将每个 hasAlert() observable 聚合到一个列表中,然后尝试使用 concatMap 一次处理一个?
  2. 一个变通方法是我在注释中添加的 patientsAlertsProcessed 变量包装 // 已完成以限制 hasAlertsMethod。我将其限制为仅命中该方法 15 次以避免超时问题。这并不理想,因为一旦这 15 个异步请求完成,我就无法再次启动它。

代码如下

ngOnInit(): void {
    this.store.dispatch(new patients.Load([]));

    this.patients$ = this.store.select(fromPatients.getAll); 
    var patientsAlertsProcessed =0;
    this.patients$.debounceTime(2000).map(p =>{//Emits Observable<Patient[]>s, the debounceTime is to take the last one as it builds up
      this.rows = p.map(pat => {// p = Patient[], iterate through the array of patients
        var observations=0;
        var rowX= {
          username: pat.username,
          id: pat.id,
          hasAlert:false, 
        };

        if (patientsAlertsProcessed<15){// this is done to throttle the HasAlerts Method
          this.hasAlerts(pat).do(x => {
            observations++;
            if (observations>0)
            {
              rowX.hasAlert=true;
            }
          }).subscribe();
          patientsAlertsProcessed++;
        }
        return rowX;
      });
    }).subscribe(
       ()=> { },
       ()=> {
        this.table.recalculatePages();
      }
    );

  }
  hasAlerts(pat: Patient): Observable<Observation> {
    var obs$= this.dataService.fetchItems<Observation>(// this is making an HTTP get request 
        "Observation",
        null,
        pat.id
      ).filter(function (x){
        if (x.category.coding[0].code == "GlucoseEvent"){ 
            return true;
        }
        else{
          return false; 
        }
      }
    );
    return obs$;
  }

您可能想按照这些思路尝试一些东西。

const concurrency = 10; 
const rowsWithAlerts = [];

this.patients$.debounceTime(2000)
.switchMap(patients => Observable.from(patients)) // patients is of type Patient[]
.mergeMap(patient => {
   rowsWithAlerts.push(patient);
   this.hasAlerts(patient).do(
   hasAlertsResult => {
     // here you have hold to both the patient and the result of
     // hasAlerts call
     patient.hasAlerts = hasAlertsResult;
   }}), concurrency)
.subscribe(
   () => this.rows = rowsWithAlerts;
)

这里的关键是使用 mergeMap 运算符并将并发级别设置为一个值,在此示例中为 10,但显然可以是任何值。

这允许限制您同时订阅的可观察对象的数量,在您的情况下意味着限制您进行的 http 调用的数量。