使用 Reactive Extensions 定期保存更​​改

Periodic saving of changes using Reactive Extensions

我有一个 Angular 应用程序,它有一个非常大的 mat-table。这个table的大部分单元格都是mat-select的组件(来自Angular Material),当mat-select中的一个值发生变化时,需要保存,但是按照下面的规则:

  1. 每 7 秒保存一次自上次保存以来所做的所有更改。
  2. 如果一行被认为是完整的,那一刻触发保存,包含自上次保存以来所做的所有更改。

我的攻击计划是这样的:

  1. 在任何 mat-select 的任何 selectionChange 上,将整个数据对象发送到 Subject
  2. 通过以下管道订阅 Subject,运行:
    • 根据上述条件缓冲可观察流
    • 过滤掉空数组(缓冲区中没有发出任何内容 window)
    • Post整个服务器数组的变化。服务器知道如何处理它。
  3. 缓冲区条件本身是一个 Observable,它会在缓冲区发出任何东西时触发缓冲区。
    • 我希望它在 7 秒过去或数据行完成时发出。所以我使用 combineLatest 组合两个 Observables,一个 interval(7000) 和要保存的数据 Subject,过滤后只获取完整的数据;并且只取任一个发出的最后一个。

这是我所做的(简化):

my.component.html

  <mat-table>
  <ng-container matColumnDef='someData'>
    <mat-header-cell *matHeaderCellDef>Some Data</mat-header-cell>
    <mat-cell *matCellDef='let row'>
      <mat-form-field>
        <mat-select [(ngModel)]='row.someData' (selectionChange)='dataToSave.next(row)'>
          <mat-option [value]='3'>High</mat-option>
          <mat-option [value]='2'>Medium</mat-option>
          <mat-option [value]='1'>Low</mat-option>
        </mat-select>
      </mat-form-field>
    </mat-cell>
  </ng-container>
  ...
</mat-table>

my.component.ts

import { Component, OnInit, AfterViewInit, OnDestroy } from '@angular/core';
import { DataService } from '../service/data.service';
import { Subject } from 'rxjs/Subject';
import { filter, switchMap, buffer, startWith } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
import { combineLatest } from 'rxjs/observable/combineLatest';

@Component({
  selector: 'app-data-grid',
  templateUrl: './data-grid.component.html',
  styleUrls: ['./data-grid.component.scss']
})
export class DataGridComponent implements OnInit, AfterViewInit, OnDestroy {
  dataToSave = new Subject<any>();

  // trigger the buffer either when 7 seconds has passed, or a row has been completed.
  bufferCondition = combineLatest(
    interval(7000).pipe(
      startWith(0)
    ),
    this.dataToSave.pipe(
      startWith({}),
      filter(row => this.checkAllFieldsCompleted(row))
    )
  );

  constructor(private dataService: DataService) { }

  ngAfterViewInit() {
    this.dataToSave.pipe(
      buffer(this.bufferCondition),
      filter(dataArray => data && dataArray.length > 0),
      switchMap(dataArray => this.dataService.saveData(dataArray))
    ).subscribe(dataSaved => { 
      /* success */ 
    }, err => { 
      /* error */
    });
  }

  checkAllFieldsCompleted(row: any): boolean {
    return /* logic to check if complete */
  }
    ...
}

我认为这将是响应式扩展的完美使用!但我似乎无法让它发挥作用。如果我单独使用 bufferTime 运算符,一切正常。所以我认为错误在于我定义自定义缓冲区条件的方式。

经过一些挖掘和大量的反复试验,我有了自己的解决方案。

this website得知

Be aware that combineLatest will not emit an initial value until each observable emits at least one value.

在我的缓冲区条件下,我试图合并两个可观察值:7 秒间隔,以及一行完成的时间。我注意到预期的行为仅在 我第一次完成一行后才开始 - 只有到那时它才会开始每 7 秒保存一次。问题是 dataToSave 可观察对象上的 filter 运算符。 combineLatest 直到 both observables 都发出了一些东西。

我更改了这段代码:

bufferCondition = combineLatest(
  interval(7000).pipe(
    startWith(0)
  ),
  this.dataToSave.pipe(
    startWith({}),
    filter(row => this.checkAllFieldsCompleted(row))
  )
);

对此:

bufferCondition = combineLatest(
  interval(7000),
  this.dataToSave.pipe(
    filter((row, index) => index === 0 || this.checkAllFieldsCompleted(row))
  )
);

而且效果很好!基本上,我不需要 startWith 运算符,并且在 filter 上,我让那个 observable 发出对数据的第一个更改,但在一行完成之前没有其他更改。