使用 Reactive Extensions 定期保存更改
Periodic saving of changes using Reactive Extensions
我有一个 Angular 应用程序,它有一个非常大的 mat-table
。这个table的大部分单元格都是mat-select
的组件(来自Angular Material),当mat-select
中的一个值发生变化时,需要保存,但是按照下面的规则:
- 每 7 秒保存一次自上次保存以来所做的所有更改。
- 如果一行被认为是完整的,那一刻触发保存,包含自上次保存以来所做的所有更改。
我的攻击计划是这样的:
- 在任何
mat-select
的任何 selectionChange
上,将整个数据对象发送到 Subject
。
- 通过以下管道订阅
Subject
,运行:
- 根据上述条件缓冲可观察流
- 过滤掉空数组(缓冲区中没有发出任何内容 window)
- Post整个服务器数组的变化。服务器知道如何处理它。
- 缓冲区条件本身是一个 Observable,它会在缓冲区发出任何东西时触发缓冲区。
- 我希望它在 7 秒过去或数据行完成时发出。所以我使用
combineLatest
组合两个 Observables,一个 interval(7000)
和要保存的数据 Subject
,过滤后只获取完整的数据;并且只取任一个发出的最后一个。
这是我所做的(简化):
my.component.html
<mat-table>
<ng-container matColumnDef='someData'>
<mat-header-cell *matHeaderCellDef>Some Data</mat-header-cell>
<mat-cell *matCellDef='let row'>
<mat-form-field>
<mat-select [(ngModel)]='row.someData' (selectionChange)='dataToSave.next(row)'>
<mat-option [value]='3'>High</mat-option>
<mat-option [value]='2'>Medium</mat-option>
<mat-option [value]='1'>Low</mat-option>
</mat-select>
</mat-form-field>
</mat-cell>
</ng-container>
...
</mat-table>
my.component.ts
import { Component, OnInit, AfterViewInit, OnDestroy } from '@angular/core';
import { DataService } from '../service/data.service';
import { Subject } from 'rxjs/Subject';
import { filter, switchMap, buffer, startWith } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
import { combineLatest } from 'rxjs/observable/combineLatest';
@Component({
selector: 'app-data-grid',
templateUrl: './data-grid.component.html',
styleUrls: ['./data-grid.component.scss']
})
export class DataGridComponent implements OnInit, AfterViewInit, OnDestroy {
dataToSave = new Subject<any>();
// trigger the buffer either when 7 seconds has passed, or a row has been completed.
bufferCondition = combineLatest(
interval(7000).pipe(
startWith(0)
),
this.dataToSave.pipe(
startWith({}),
filter(row => this.checkAllFieldsCompleted(row))
)
);
constructor(private dataService: DataService) { }
ngAfterViewInit() {
this.dataToSave.pipe(
buffer(this.bufferCondition),
filter(dataArray => data && dataArray.length > 0),
switchMap(dataArray => this.dataService.saveData(dataArray))
).subscribe(dataSaved => {
/* success */
}, err => {
/* error */
});
}
checkAllFieldsCompleted(row: any): boolean {
return /* logic to check if complete */
}
...
}
我认为这将是响应式扩展的完美使用!但我似乎无法让它发挥作用。如果我单独使用 bufferTime
运算符,一切正常。所以我认为错误在于我定义自定义缓冲区条件的方式。
经过一些挖掘和大量的反复试验,我有了自己的解决方案。
从this website得知
Be aware that combineLatest
will not emit an initial value until each observable emits at least one value.
在我的缓冲区条件下,我试图合并两个可观察值:7 秒间隔,以及一行完成的时间。我注意到预期的行为仅在 我第一次完成一行后才开始 - 只有到那时它才会开始每 7 秒保存一次。问题是 dataToSave
可观察对象上的 filter
运算符。 combineLatest
直到 both observables 都发出了一些东西。
我更改了这段代码:
bufferCondition = combineLatest(
interval(7000).pipe(
startWith(0)
),
this.dataToSave.pipe(
startWith({}),
filter(row => this.checkAllFieldsCompleted(row))
)
);
对此:
bufferCondition = combineLatest(
interval(7000),
this.dataToSave.pipe(
filter((row, index) => index === 0 || this.checkAllFieldsCompleted(row))
)
);
而且效果很好!基本上,我不需要 startWith
运算符,并且在 filter
上,我让那个 observable 发出对数据的第一个更改,但在一行完成之前没有其他更改。
我有一个 Angular 应用程序,它有一个非常大的 mat-table
。这个table的大部分单元格都是mat-select
的组件(来自Angular Material),当mat-select
中的一个值发生变化时,需要保存,但是按照下面的规则:
- 每 7 秒保存一次自上次保存以来所做的所有更改。
- 如果一行被认为是完整的,那一刻触发保存,包含自上次保存以来所做的所有更改。
我的攻击计划是这样的:
- 在任何
mat-select
的任何selectionChange
上,将整个数据对象发送到Subject
。 - 通过以下管道订阅
Subject
,运行:- 根据上述条件缓冲可观察流
- 过滤掉空数组(缓冲区中没有发出任何内容 window)
- Post整个服务器数组的变化。服务器知道如何处理它。
- 缓冲区条件本身是一个 Observable,它会在缓冲区发出任何东西时触发缓冲区。
- 我希望它在 7 秒过去或数据行完成时发出。所以我使用
combineLatest
组合两个 Observables,一个interval(7000)
和要保存的数据Subject
,过滤后只获取完整的数据;并且只取任一个发出的最后一个。
- 我希望它在 7 秒过去或数据行完成时发出。所以我使用
这是我所做的(简化):
my.component.html
<mat-table>
<ng-container matColumnDef='someData'>
<mat-header-cell *matHeaderCellDef>Some Data</mat-header-cell>
<mat-cell *matCellDef='let row'>
<mat-form-field>
<mat-select [(ngModel)]='row.someData' (selectionChange)='dataToSave.next(row)'>
<mat-option [value]='3'>High</mat-option>
<mat-option [value]='2'>Medium</mat-option>
<mat-option [value]='1'>Low</mat-option>
</mat-select>
</mat-form-field>
</mat-cell>
</ng-container>
...
</mat-table>
my.component.ts
import { Component, OnInit, AfterViewInit, OnDestroy } from '@angular/core';
import { DataService } from '../service/data.service';
import { Subject } from 'rxjs/Subject';
import { filter, switchMap, buffer, startWith } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
import { combineLatest } from 'rxjs/observable/combineLatest';
@Component({
selector: 'app-data-grid',
templateUrl: './data-grid.component.html',
styleUrls: ['./data-grid.component.scss']
})
export class DataGridComponent implements OnInit, AfterViewInit, OnDestroy {
dataToSave = new Subject<any>();
// trigger the buffer either when 7 seconds has passed, or a row has been completed.
bufferCondition = combineLatest(
interval(7000).pipe(
startWith(0)
),
this.dataToSave.pipe(
startWith({}),
filter(row => this.checkAllFieldsCompleted(row))
)
);
constructor(private dataService: DataService) { }
ngAfterViewInit() {
this.dataToSave.pipe(
buffer(this.bufferCondition),
filter(dataArray => data && dataArray.length > 0),
switchMap(dataArray => this.dataService.saveData(dataArray))
).subscribe(dataSaved => {
/* success */
}, err => {
/* error */
});
}
checkAllFieldsCompleted(row: any): boolean {
return /* logic to check if complete */
}
...
}
我认为这将是响应式扩展的完美使用!但我似乎无法让它发挥作用。如果我单独使用 bufferTime
运算符,一切正常。所以我认为错误在于我定义自定义缓冲区条件的方式。
经过一些挖掘和大量的反复试验,我有了自己的解决方案。
从this website得知
Be aware that
combineLatest
will not emit an initial value until each observable emits at least one value.
在我的缓冲区条件下,我试图合并两个可观察值:7 秒间隔,以及一行完成的时间。我注意到预期的行为仅在 我第一次完成一行后才开始 - 只有到那时它才会开始每 7 秒保存一次。问题是 dataToSave
可观察对象上的 filter
运算符。 combineLatest
直到 both observables 都发出了一些东西。
我更改了这段代码:
bufferCondition = combineLatest(
interval(7000).pipe(
startWith(0)
),
this.dataToSave.pipe(
startWith({}),
filter(row => this.checkAllFieldsCompleted(row))
)
);
对此:
bufferCondition = combineLatest(
interval(7000),
this.dataToSave.pipe(
filter((row, index) => index === 0 || this.checkAllFieldsCompleted(row))
)
);
而且效果很好!基本上,我不需要 startWith
运算符,并且在 filter
上,我让那个 observable 发出对数据的第一个更改,但在一行完成之前没有其他更改。