在 Rxjs 中如何暂停一个数据流并从第二个暂停的数据流继续?

In Rxjs How to pause a stream of data and continue from the second It was paused?

我有一个每 10 秒长轮询一次的数据流。长 pol 可以暂停继续

问题是:它继续时重新开始。我必须让它从暂停的那一刻继续。

所以如果:

延迟(10000)

暂停于 (7000)

然后当我再次开始长轮询时,我的数据必须在 (10000-7000)=3000

之后拉取

今天是第四天我一直被这个问题困得要疯了..

我创造了这个闪电战

https://stackblitz.com/edit/ang7-working-rxjs-stream-timer-combined?file=src/app/app.component.ts

创建了一个流和一个提取数据的计时器。当流程和计时器停止时,设置剩余的时间以及新的延迟。但是当我按下开始时它仍然不会再等待 3 秒它只是立即调用我不想要的服务

这是一个非常基本的实现,它依赖于 timer 函数和 takefiltertakeUntil 运算符。

我认为这个解决方案不太优雅。它取决于状态变量 pausedfirst,并且在订阅中的 next 回调中有一个递归触发器。另请注意,它目前不进行任何错误处理。在这种情况下,轮询完全停止。

控制器

export const REFRESH_INTERVAL = 11;

@Component({ ... })
export class AppComponent implements OnDestroy {
  pause$ = new Subject();
  reset$ = new Subject();
  closed$ = new Subject();
  counter = REFRESH_INTERVAL;
  res: any;
  paused = false;

  constructor(private freeApiService: freeApiService) {}

  ngOnInit() {
    this.init();
  }

  init() {
    let first = true;
    this.reset$
      .pipe(
        switchMap(_ =>
          timer(0, 1000).pipe(
            take(this.counter),
            tap(_ => (this.paused = this.paused ? false : this.paused)),
            takeUntil(this.pause$),
            filter(_ => first || --this.counter === 0),
            finalize(
              () => (this.paused = this.counter != 0 ? true : this.paused)
            ),
            switchMap(_ => this.freeApiService.getDummy())
          )
        ),
        takeUntil(this.closed$)
      )
      .subscribe({
        next: res => {
          first = false;
          this.res = res;
          this.counter = REFRESH_INTERVAL;
          this.reset$.next();
        },
        error: error => console.log("Error fethcing data:", error)
      });
  }

  ngOnDestroy() {
    this.closed$.next();
  }
}

模板

<div my-app>
    <ng-container *ngIf="res; else noRes">
        <button (mouseup)="paused ? reset$.next() : pause$.next()">
      {{ paused ? 'Resume poll' : 'Pause poll' }}
    </button>

        <br><br>
        Refreshing in: {{ counter }}
        <br>
    Response:
        <pre>{{ res | json }}</pre>
    </ng-container>

    <ng-template #noRes>
        <button (mouseup)="reset$.next()">
      Start poll
    </button>
    </ng-template>
</div>

我修改了你的Stackblitz

我会结合使用 intervalfilterscan 运算符,并将检查频率设置为 1s:

const pauserSubject = new BehaviorSubject(true);

const s$ = interval(1000).pipe(
  withLatestFrom(pauserSubject),
  filter(([intervalValue, isAllowed]) => isAllowed),
  scan(acc => acc + 1, 0),
  tap(emissionNumber => console.log('check attempt #', emissionNumber, ' on ', new Date().toTimeString())),
  filter(emissionCount => !(emissionCount % 10)),
  switchMapTo(of('request/response'))
);

Stackblitz example here

在这种情况下,常规 interval 设置“节拍”,每隔 1 秒检查一次暂停是真还是假。取消暂停后,相应的 filter 运算符允许继续。 scan 验证每 10 次检查(或每 10 秒)将产生一个 http 调用。

对于您的场景:

  1. 您开始每 10 秒轮询一次
  2. 你在第 3 秒停了一秒长
  3. 计数器确保还应进行 7 次检查(导致等待 7 秒)
  4. 第11秒发起http调用(10个正常+1秒等待)

这归结为拥有一个“可暂停计时器”。一旦你有了它,你就可以在它完成时使用 repeat 运算符来 re-run 它:

somePausableTimer$.pipe(
    repeat(),
    switchMap(() => of('polling time'))
)

如果您关心时间精度,这是我对“可暂停计时器”的看法 - 我认为它与 javascript 一样精确,但没有解决方案带来的性能损失,例如“每 10 毫秒检查一次暂停状态":

import {delay, distinctUntilChanged, filter, map, mapTo, pairwise, repeat, 
    scan, startWith, switchMap, take, withLatestFrom} from 'rxjs/operators';
import {defer, fromEvent, NEVER, Observable, of} from 'rxjs';

function pausableTimer(timerSpan: number, isActive$: Observable<boolean>): Observable<boolean> {

    const activeState$ = isActive$.pipe(
        distinctUntilChanged(),
        startWith(true, true),
        map(isActive => ({
            isActive,
            at: Date.now()
        }))
    );

    const pauseSpans$ = activeState$.pipe(
        pairwise(),
        filter(([,curr]) => curr.isActive),
        map(([prev, curr]) => curr.at - prev.at)
    );

    const accumulatedPauseSpan$ = pauseSpans$.pipe(
        scan((acc, curr) => acc += curr, 0)
    );

    return defer(() => {
        const startTime = Date.now();
        const originalEndTime = startTime + timerSpan;

        return activeState$.pipe(
            withLatestFrom(accumulatedPauseSpan$),
            switchMap(([activeState, accPause]) => {
                if (activeState.isActive) {
                    return of(true).pipe(
                        delay(originalEndTime - Date.now() + accPause)
                    );
                }
                else {
                    return NEVER;
                }
            }),
            take(1)
        );
    });
}

该函数接受一个以毫秒为单位的 timerSpan(在您的情况下为 10000)和一个 isActive$ 可观察值,它为 resume/pause 发出 true/false。所以把它放在一起:

const isActive$ = fromEvent(document, 'click').pipe(scan(acc => !acc, true)); // for example

pausableTimer(10000, isActive$).pipe(
    repeat(),
    switchMap(() => of('polling time'))
).subscribe();