Angular NgRx - 继续轮询仅第一次调用的服务的效果

Angular NgRx - Effect to continue polling a service only called the first time

我有一个应用程序,我刚刚在其中添加了 NgRX,我希望在其中使用效果来打开和关闭轮询。

示例大纲

我关注了 this post which seemed like a good approach. I have a simplified example of this here,大部分代码在 app.effects.ts

与示例类似,我有效果 startPolling$stopPolling$continuePolling$,除了我使用较新的 createEffect 工厂方法。

此外,我已将 delay(2000) 移到 takeWhile() 上方,因为我发现如果服务调用抛出错误,catchError(err => of(appActions.getDataFail(err))) 会导致效果进入没有延迟的连续非常快速的循环。

启动和停止按钮调度轮询启动和停止...

public start() {
    console.log('dispatching start');
    this.store.dispatch(appActions.startPolling());
  }

  public stop() {
    console.log('dispatching stop');
    this.store.dispatch(appActions.stopPolling());
  }

我的问题

我有一些控制台日志,所以我们可以看到发生了什么。

当我们单击 开始按钮(只是 第一次 时间)时,我可以看到轮询开始,并按预期继续。例如,我可以一遍又一遍地看到以下内容...

dispatching start
app effect started polling
app.service.getData
app effect continue polling
app.service.getData
app effect continue polling
app.service.getData
app effect continue polling

完美。

当我停下来时,我看到了

dispatching stop
app effect stop polling

也正确。

现在,问题 是在我尝试重新启动时 。如果我现在再次点击开始按钮,我看到的只是最初的开始轮询效果...

dispatching start
app effect started polling
app.service.getData

并且continuePolling$中的代码不再被调用,所以我没有轮询

有谁知道为什么这个效果没有被秒次触发?我只是想不通这是为什么。

更新 1

我想也许我的问题是,一旦 isPollingActive 设置为 false,并且 takeWhile(() => this.isPollingActive),“停止”,observable 不再处于活动状态,即 continuePolling$ 完成,所以永远不会重启?

假设这样,我尝试了以下方法,其中我有 2 个不同的变量,一个用于“暂停”轮询(例如,如果我在离线模式下检测到该应用程序),另一个用于取消(即用户何时导航组件外)。

所以,我的整个效果现在变成了...

    @Injectable()
    export class AppEffects {
      private isPollingCancelled: boolean;
      private isPollingPaused: boolean;

      constructor(
        private actions$: Actions,
        private store: Store<AppState>,
        private appDataService: AppDataService
      ) { }

      public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.startPolling),
        tap(_ => console.log('app effect started polling')),
        tap(() => {
          this.isPollingCancelled = false;
          this.isPollingPaused = false;
        }),        
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(                        
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));

         public pausePolling$ = createEffect(() => this.actions$.pipe(
            ofType(appActions.pausePolling),
            tap(_ => this.isPollingPaused = true),
            tap(_ => console.log('app effect pause polling')),       
         ));
      
      public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => this.isPollingCancelled = true),
        tap(_ => console.log('app effect cancel polling')),
      ));

        public continuePolling$ = createEffect(() => this.actions$.pipe(
          ofType(appActions.getDataSuccess, appActions.getDataFail),    
          tap(data => console.log('app effect continue polling')),  
          takeWhile(() => !this.isPollingCancelled),    
          delay(3000),  
     
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(   
                delay(3000),  
                tap(data => console.log('app effect continue polling - inner loop')),  
                takeWhile(() => !this.isPollingPaused), // check again incase this has been unset since delay 
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));    
    } 

我不推荐运行上面的方法,因为当我发送一个pause polling action时,效果似乎陷入了死循环,我不得不通过任务管理器杀死浏览器。

我不知道为什么会这样,但我似乎比以前更远离解决方案。

更新 2

我注意到我没有从暂停和取消效果返回任何操作。

所以我已经更新了它们,我们遵循...

 public pausePolling$ = createEffect(() => this.actions$.pipe(
    ofType(appActions.pausePolling),
    tap(_ => this.isPollingPaused = true),
    tap(_ => console.log('app effect pause polling')),
    map(_ => appActions.pausePollingSuccess())
  ));
  
  public cancelPolling$ = createEffect(() => this.actions$.pipe(
    ofType(appActions.cancelPolling),
    tap(_ => {
      this.isPollingCancelled = true;
      this.isPollingPaused = true;
    }),
    tap(_ => console.log('app effect cancel polling')),
    map(_ => appActions.cancelPollingSuccess())
  ));

现在暂停似乎工作正常,但是当我调度 appActions.cancelPolling 时,我再次看到 app effect cancel polling 的无限循环被记录到控制台。

更新 3

我找到了为什么会出现无限循环以及如何停止它。根据 doco here,我可以添加 dispatch:false...

    public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => {
          this.isPollingCancelled = true;
          this.isPollingPaused = true;
        }),
        tap(_ => console.log('app effect cancel polling')),
      ), { dispatch: false }); // <------ add this

这似乎解决了我的无限循环问题。

我现在唯一的任务是能够弄清楚如何能够启动、停止和重新启动轮询处理对 appDataService.getData() 的成功调用以及异常。

我可以让它为一个或另一个工作(取决于我把延迟和 takewhile 放在哪里),但不能同时为两者工作

更新 4

我有最新的代码here

运行 就这样,我让 getData 成功了,而且令人惊讶的是,暂停或停止操作都会停止它并允许它重新启动。我很惊讶 stop 操作允许它重新启动,因为我假设 takeWhile(() => !this.isPollingCancelled), 会取消效果。

此外,如果将 true 传递给 getData,这将导致其可观察到错误。轮询继续(按需要,即即使出现错误仍会重试),但是一旦我们现在发送暂停操作,它就不会停止轮询,如果我们发送停止,它确实会停止,但不会重新启动。我赢不了。

更新 5

我想也许因为继续轮询效果被取消了,我可以每次都重新创建它,如下所示..

    import { Injectable, OnInit, OnDestroy } from '@angular/core';
    import { createEffect, Actions, ofType } from '@ngrx/effects';
    import { select, Store } from '@ngrx/store';
    import { mergeMap, map, catchError, takeWhile, delay, tap, switchMap } from 'rxjs/operators';
    import { AppState } from './app.state';
    import { Observable, of } from 'rxjs';
    import { AppDataService } from '../app-data.service';
    import * as appActions from './app.actions';

    @Injectable()
    export class AppEffects {
      private isPollingCancelled: boolean;
      private isPollingPaused: boolean;

      constructor(
        private actions$: Actions,
        private store: Store<AppState>,
        private appDataService: AppDataService
      ) { }

      public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.startPolling),
        tap(_ => console.log('app effect started polling')),
        tap(() => {
          this.isPollingCancelled = false;
          this.isPollingPaused = false;
          this.createPollingEffect(); // <--- recreate the effect every time
        }),        
          mergeMap(() =>
            this.appDataService.getData()
              .pipe(                        
                switchMap(data => {              
                  return [appActions.getDataSuccess(data)
                  ];
                  }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ));

      public pausePolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.pausePolling),
        tap(_ => this.isPollingPaused = true),
        tap(_ => console.log('app effect pause polling')),
      ), { dispatch: false });
      
      public cancelPolling$ = createEffect(() => this.actions$.pipe(
        ofType(appActions.cancelPolling),
        tap(_ => {
          this.isPollingCancelled = true;
          this.isPollingPaused = true;
        }),
        tap(_ => console.log('app effect cancel polling')),
      ), { dispatch: false });

      public continuePolling$: any;

      private createPollingEffect(): void {
        console.log('creating continuePolling$');
        this.continuePolling$ = createEffect(() => this.actions$.pipe(
          ofType(appActions.getDataSuccess, appActions.getDataFail),
          tap(data => console.log('app effect continue polling')),
          delay(3000),
          takeWhile(() => !this.isPollingCancelled),
          mergeMap(() =>
            this.appDataService.getData(false)
              .pipe(
                tap(data => console.log('app effect continue polling - inner loop')),

                switchMap(data => {
                  return [appActions.getDataSuccess(data)
                  ];
                }),
                catchError(err => of(appActions.getDataFail(err)))
              ))
        ), { resubscribeOnError: true });
      } 
    }

因此,在 startPolling 中,我调用 this.createPollingEffect() 来创建继续轮询效果。

但是,当我尝试这样做时,轮询从未开始。

更新 6

我想出了一个似乎对我有用的解决方案。

我有以下

public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.startPollingGetData),
        tap(_ => this.logger.info('effect start polling')),
        tap(() => this.isPollingActive = true),
        switchMap(_ => this.syncData())
      ), { dispatch: false });
      
    public continuePolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataPlannerActions.DataSuccess,
          dataActions.DataFail),
        tap(_ => this.logger.debug('data effect continue polling')),
        tap(_ => this.isInDelay = true),
        delay(8000),
        tap(_ => this.isInDelay = false),
        switchMap(_ => this.syncData())
      ), { dispatch: false });


    public stopPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.stopPollingData),
        tap(_ => this.isPollingActive = false),
        tap(_ => this.logger.info('data effect stop polling')),
        map(_ => dataActions.stopPollingDataSuccess())
      ), { dispatch: false });


    private syncData(): Observable<Action> {
        const result$: Observable<Action> = Observable.create(async subscriber => {
          try {
            // If polling "switched off", we just need to return anything (not actually used)
            // Id isInDelay, we may be restating while we still have a pending delay.
            // In this case we will exit, and just wait for the delay to restart
            // (otherwise we can end up with more than one call to this)
            if (this.isInDelay || !this.isPollingActive) {
              subscriber.next("");
              return;
            }

我在这里使用了几个“标志”,我相信你会是一种更“rxy”的方式。

事实上, 关于如何摆脱 isInDelay(我只需要设法将其放入上面的生产代码中)

改用它:

public startPolling$ = createEffect(() => this.actions$.pipe(
  ofType(appActions.startPolling),    
  tap(_ => console.log('app effect started polling')),  
  tap(() => this.isPollingActive = true),        
  switchMap(() =>
    this.appDataSurvice.getData()
      .pipe(                        
        exhaustMap(data => {              
          return [appActions.getDataSuccess(data)];
        }),
        catchError(err => of(appActions.getDataFail(err)))
      ))
));

您处理问题的方式值得称赞。我在重新启动轮询时遇到了完全相同的问题,这篇文章对我有所帮助。

我现在面临的一个问题是,如果轮询在不到 3 秒(指定的计时器)内重新启动,则有多次调用该服务。换句话说,轮询 pauses/stops 只有在间隔过去后才完全进行。因此,如果您尝试在计时器结束之前再次启动它,则会有多个线程 运行。刚刚在服务调用中添加了时间戳@https://angular-ngrx-polling3-j7b8st.stackblitz.io

每次轮询都会调用两次服务。

我把它作为我 question/discussion 的一部分,但我认为可以作为一种解决方案,使它更显眼...

我想出了一个似乎对我有用的解决方案。

我有以下

public startPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.startPollingGetData),
        tap(_ => this.logger.info('effect start polling')),
        tap(() => this.isPollingActive = true),
        switchMap(_ => this.syncData())
      ), { dispatch: false });

    public continuePolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataPlannerActions.DataSuccess,
          dataActions.DataFail),
        tap(_ => this.logger.debug('data effect continue polling')),
        tap(_ => this.isInDelay = true),
        delay(8000),
        tap(_ => this.isInDelay = false),
        switchMap(_ => this.syncData())
      ), { dispatch: false });


    public stopPolling$ = createEffect(() => this.actions$.pipe(
        ofType(dataActions.stopPollingData),
        tap(_ => this.isPollingActive = false),
        tap(_ => this.logger.info('data effect stop polling')),
        map(_ => dataActions.stopPollingDataSuccess())
      ), { dispatch: false });


    private syncData(): Observable<Action> {
        const result$: Observable<Action> = Observable.create(async subscriber => {
          try {
            // If polling "switched off", we just need to return anything (not actually used)
            // Id isInDelay, we may be restating while we still have a pending delay.
            // In this case we will exit, and just wait for the delay to restart
            // (otherwise we can end up with more than one call to this)
            if (this.isInDelay || !this.isPollingActive) {
              subscriber.next("");
              return;
            }

我在这里使用了几个 "flags",我相信你会是一个更 "rxy" 的方法。

事实上, 关于如何摆脱 isInDelay(我只需要设法将其放入上面的生产代码中)

根据@peterc 和@Ingo Bürk 的意见,我能够对所有场景进行正面测试。下面是我的代码的样子。

@Effect()
      getPageData$ = this.actions$.pipe(
        ofType(actions.StartLoading),
        tap(() => {
          this.appService.isPollingActive = true;
        }),
        mergeMap(() =>
          this.appService.getData().pipe(
            switchMap((response: GridDataResponse) => {
              return [new actions.DoneLoading(response.data)];
            }),
            retry(1),
            catchError(err => {
              return of(new actions.FailedLoading());
            })
          ))
      );

      @Effect()
      public stopPolling$ = this.actions$.pipe(
        ofType(actions.StopPolling),
        tap(_ => {
          this.appService.isPollingActive = false;
        }),
        mergeMap(() => {
          return [new actions.ResetLoading()];
        })
      );

      @Effect()
      public continuePolling$ = this.actions$.pipe(
        ofType(actions.DoneLoading,
          actions.FailedLoading),
        switchMap(_ =>
          timer(this.appService.pollingTimer).pipe(
            takeUntil(this.actions$.pipe(ofType(actions.StopPolling))),
            mergeMap(() =>
            this.appService.getData().pipe(
              takeWhile(() => this.appService.isPollingActive),
              switchMap((response: GridDataResponse) => {
                return [new actions.DoneLoading(response.data)];
              }),
              catchError(err => {
                return of(new actions.FailedLoading());
              })
            ))
          )
      )
      );