RxJs Interval with takeUntil 发布最后一个值

RxJs Interval with takeUntil to publish last value

我有一些代码可以轮询直到任务完成

见下文

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

我试过使用 takeUntil 和 takeWhile,但问题是一旦任务完成,最后一个值就永远不会发布。

为了解决这个问题,我必须将 tap 方法包含在 stopPoll 主题中,并递增 stopCount 以获得最后一个值。

所以上面的方法可行,但感觉有点乱,我确定一定有更好的方法来实现这个目标?

我本以为 takeUntil 会发布最后一个值,或者有一个覆盖告诉它例如 takeUntil(observable, {publishLast: true})

BTW 更新,observable 被 Angular 6 模板订阅 提前致谢

如果您想完成可观察对象,您还可以使用 next() 创建主题并发出。

this.stopPoll: Subject<any> = new Subject<any>();

如果你想完成订阅。你可以打电话 this.stopPoll.next(true);

您可以访问 subscribe() 中的数据

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});

您可以做的一件事是使用自定义的类似 takeWhile 的运算符,如下所示:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

将其视为 takeWhite 的变体似乎不是一个好主意,因为它不只是 采用 a条件成立,但也会发出额外的值。

可能更优雅的解决方案是让模拟状态 observable 发出两种值:下一个通知和完成通知,类似于 materialize/dematerialize 运算符的工作方式。

这同时在 rxjs 中实现为 takeWhile(condition, ?inclusive):

timer(0, 10).pipe(
    takeWhile((x) => x < 3, true)
)

发出 0、1、2、3