在 Rxjs 中如何暂停一个数据流并从第二个暂停的数据流继续?
In Rxjs How to pause a stream of data and continue from the second It was paused?
我有一个每 10 秒长轮询一次的数据流。长 pol 可以暂停和继续。
问题是:它继续时重新开始。我必须让它从暂停的那一刻继续。
所以如果:
延迟(10000)
暂停于 (7000)
然后当我再次开始长轮询时,我的数据必须在 (10000-7000)=3000
之后拉取
今天是第四天我一直被这个问题困得要疯了..
我创造了这个闪电战
https://stackblitz.com/edit/ang7-working-rxjs-stream-timer-combined?file=src/app/app.component.ts
创建了一个流和一个提取数据的计时器。当流程和计时器停止时,设置剩余的时间以及新的延迟。但是当我按下开始时它仍然不会再等待 3 秒它只是立即调用我不想要的服务
这是一个非常基本的实现,它依赖于 timer
函数和 take
、filter
和 takeUntil
运算符。
我认为这个解决方案不太优雅。它取决于状态变量 paused
和 first
,并且在订阅中的 next
回调中有一个递归触发器。另请注意,它目前不进行任何错误处理。在这种情况下,轮询完全停止。
控制器
export const REFRESH_INTERVAL = 11;
@Component({ ... })
export class AppComponent implements OnDestroy {
pause$ = new Subject();
reset$ = new Subject();
closed$ = new Subject();
counter = REFRESH_INTERVAL;
res: any;
paused = false;
constructor(private freeApiService: freeApiService) {}
ngOnInit() {
this.init();
}
init() {
let first = true;
this.reset$
.pipe(
switchMap(_ =>
timer(0, 1000).pipe(
take(this.counter),
tap(_ => (this.paused = this.paused ? false : this.paused)),
takeUntil(this.pause$),
filter(_ => first || --this.counter === 0),
finalize(
() => (this.paused = this.counter != 0 ? true : this.paused)
),
switchMap(_ => this.freeApiService.getDummy())
)
),
takeUntil(this.closed$)
)
.subscribe({
next: res => {
first = false;
this.res = res;
this.counter = REFRESH_INTERVAL;
this.reset$.next();
},
error: error => console.log("Error fethcing data:", error)
});
}
ngOnDestroy() {
this.closed$.next();
}
}
模板
<div my-app>
<ng-container *ngIf="res; else noRes">
<button (mouseup)="paused ? reset$.next() : pause$.next()">
{{ paused ? 'Resume poll' : 'Pause poll' }}
</button>
<br><br>
Refreshing in: {{ counter }}
<br>
Response:
<pre>{{ res | json }}</pre>
</ng-container>
<ng-template #noRes>
<button (mouseup)="reset$.next()">
Start poll
</button>
</ng-template>
</div>
我修改了你的Stackblitz
我会结合使用 interval
、filter
和 scan
运算符,并将检查频率设置为 1s:
const pauserSubject = new BehaviorSubject(true);
const s$ = interval(1000).pipe(
withLatestFrom(pauserSubject),
filter(([intervalValue, isAllowed]) => isAllowed),
scan(acc => acc + 1, 0),
tap(emissionNumber => console.log('check attempt #', emissionNumber, ' on ', new Date().toTimeString())),
filter(emissionCount => !(emissionCount % 10)),
switchMapTo(of('request/response'))
);
在这种情况下,常规 interval
设置“节拍”,每隔 1 秒检查一次暂停是真还是假。取消暂停后,相应的 filter
运算符允许继续。 scan
验证每 10 次检查(或每 10 秒)将产生一个 http 调用。
对于您的场景:
- 您开始每 10 秒轮询一次
- 你在第 3 秒停了一秒长
- 计数器确保还应进行 7 次检查(导致等待 7 秒)
- 第11秒发起http调用(10个正常+1秒等待)
这归结为拥有一个“可暂停计时器”。一旦你有了它,你就可以在它完成时使用 repeat
运算符来 re-run 它:
somePausableTimer$.pipe(
repeat(),
switchMap(() => of('polling time'))
)
如果您关心时间精度,这是我对“可暂停计时器”的看法 - 我认为它与 javascript 一样精确,但没有解决方案带来的性能损失,例如“每 10 毫秒检查一次暂停状态":
import {delay, distinctUntilChanged, filter, map, mapTo, pairwise, repeat,
scan, startWith, switchMap, take, withLatestFrom} from 'rxjs/operators';
import {defer, fromEvent, NEVER, Observable, of} from 'rxjs';
function pausableTimer(timerSpan: number, isActive$: Observable<boolean>): Observable<boolean> {
const activeState$ = isActive$.pipe(
distinctUntilChanged(),
startWith(true, true),
map(isActive => ({
isActive,
at: Date.now()
}))
);
const pauseSpans$ = activeState$.pipe(
pairwise(),
filter(([,curr]) => curr.isActive),
map(([prev, curr]) => curr.at - prev.at)
);
const accumulatedPauseSpan$ = pauseSpans$.pipe(
scan((acc, curr) => acc += curr, 0)
);
return defer(() => {
const startTime = Date.now();
const originalEndTime = startTime + timerSpan;
return activeState$.pipe(
withLatestFrom(accumulatedPauseSpan$),
switchMap(([activeState, accPause]) => {
if (activeState.isActive) {
return of(true).pipe(
delay(originalEndTime - Date.now() + accPause)
);
}
else {
return NEVER;
}
}),
take(1)
);
});
}
该函数接受一个以毫秒为单位的 timerSpan
(在您的情况下为 10000)和一个 isActive$
可观察值,它为 resume/pause 发出 true
/false
。所以把它放在一起:
const isActive$ = fromEvent(document, 'click').pipe(scan(acc => !acc, true)); // for example
pausableTimer(10000, isActive$).pipe(
repeat(),
switchMap(() => of('polling time'))
).subscribe();
我有一个每 10 秒长轮询一次的数据流。长 pol 可以暂停和继续。
问题是:它继续时重新开始。我必须让它从暂停的那一刻继续。
所以如果:
延迟(10000)
暂停于 (7000)
然后当我再次开始长轮询时,我的数据必须在 (10000-7000)=3000
之后拉取今天是第四天我一直被这个问题困得要疯了..
我创造了这个闪电战
https://stackblitz.com/edit/ang7-working-rxjs-stream-timer-combined?file=src/app/app.component.ts
创建了一个流和一个提取数据的计时器。当流程和计时器停止时,设置剩余的时间以及新的延迟。但是当我按下开始时它仍然不会再等待 3 秒它只是立即调用我不想要的服务
这是一个非常基本的实现,它依赖于 timer
函数和 take
、filter
和 takeUntil
运算符。
我认为这个解决方案不太优雅。它取决于状态变量 paused
和 first
,并且在订阅中的 next
回调中有一个递归触发器。另请注意,它目前不进行任何错误处理。在这种情况下,轮询完全停止。
控制器
export const REFRESH_INTERVAL = 11;
@Component({ ... })
export class AppComponent implements OnDestroy {
pause$ = new Subject();
reset$ = new Subject();
closed$ = new Subject();
counter = REFRESH_INTERVAL;
res: any;
paused = false;
constructor(private freeApiService: freeApiService) {}
ngOnInit() {
this.init();
}
init() {
let first = true;
this.reset$
.pipe(
switchMap(_ =>
timer(0, 1000).pipe(
take(this.counter),
tap(_ => (this.paused = this.paused ? false : this.paused)),
takeUntil(this.pause$),
filter(_ => first || --this.counter === 0),
finalize(
() => (this.paused = this.counter != 0 ? true : this.paused)
),
switchMap(_ => this.freeApiService.getDummy())
)
),
takeUntil(this.closed$)
)
.subscribe({
next: res => {
first = false;
this.res = res;
this.counter = REFRESH_INTERVAL;
this.reset$.next();
},
error: error => console.log("Error fethcing data:", error)
});
}
ngOnDestroy() {
this.closed$.next();
}
}
模板
<div my-app>
<ng-container *ngIf="res; else noRes">
<button (mouseup)="paused ? reset$.next() : pause$.next()">
{{ paused ? 'Resume poll' : 'Pause poll' }}
</button>
<br><br>
Refreshing in: {{ counter }}
<br>
Response:
<pre>{{ res | json }}</pre>
</ng-container>
<ng-template #noRes>
<button (mouseup)="reset$.next()">
Start poll
</button>
</ng-template>
</div>
我修改了你的Stackblitz
我会结合使用 interval
、filter
和 scan
运算符,并将检查频率设置为 1s:
const pauserSubject = new BehaviorSubject(true);
const s$ = interval(1000).pipe(
withLatestFrom(pauserSubject),
filter(([intervalValue, isAllowed]) => isAllowed),
scan(acc => acc + 1, 0),
tap(emissionNumber => console.log('check attempt #', emissionNumber, ' on ', new Date().toTimeString())),
filter(emissionCount => !(emissionCount % 10)),
switchMapTo(of('request/response'))
);
在这种情况下,常规 interval
设置“节拍”,每隔 1 秒检查一次暂停是真还是假。取消暂停后,相应的 filter
运算符允许继续。 scan
验证每 10 次检查(或每 10 秒)将产生一个 http 调用。
对于您的场景:
- 您开始每 10 秒轮询一次
- 你在第 3 秒停了一秒长
- 计数器确保还应进行 7 次检查(导致等待 7 秒)
- 第11秒发起http调用(10个正常+1秒等待)
这归结为拥有一个“可暂停计时器”。一旦你有了它,你就可以在它完成时使用 repeat
运算符来 re-run 它:
somePausableTimer$.pipe(
repeat(),
switchMap(() => of('polling time'))
)
如果您关心时间精度,这是我对“可暂停计时器”的看法 - 我认为它与 javascript 一样精确,但没有解决方案带来的性能损失,例如“每 10 毫秒检查一次暂停状态":
import {delay, distinctUntilChanged, filter, map, mapTo, pairwise, repeat,
scan, startWith, switchMap, take, withLatestFrom} from 'rxjs/operators';
import {defer, fromEvent, NEVER, Observable, of} from 'rxjs';
function pausableTimer(timerSpan: number, isActive$: Observable<boolean>): Observable<boolean> {
const activeState$ = isActive$.pipe(
distinctUntilChanged(),
startWith(true, true),
map(isActive => ({
isActive,
at: Date.now()
}))
);
const pauseSpans$ = activeState$.pipe(
pairwise(),
filter(([,curr]) => curr.isActive),
map(([prev, curr]) => curr.at - prev.at)
);
const accumulatedPauseSpan$ = pauseSpans$.pipe(
scan((acc, curr) => acc += curr, 0)
);
return defer(() => {
const startTime = Date.now();
const originalEndTime = startTime + timerSpan;
return activeState$.pipe(
withLatestFrom(accumulatedPauseSpan$),
switchMap(([activeState, accPause]) => {
if (activeState.isActive) {
return of(true).pipe(
delay(originalEndTime - Date.now() + accPause)
);
}
else {
return NEVER;
}
}),
take(1)
);
});
}
该函数接受一个以毫秒为单位的 timerSpan
(在您的情况下为 10000)和一个 isActive$
可观察值,它为 resume/pause 发出 true
/false
。所以把它放在一起:
const isActive$ = fromEvent(document, 'click').pipe(scan(acc => !acc, true)); // for example
pausableTimer(10000, isActive$).pipe(
repeat(),
switchMap(() => of('polling time'))
).subscribe();