RxJs - 如何 return notifier value with takeuntil operator

RxJs - How to return notifier value with takeuntil operator

我有一个简单的 Rxjs 计时器,它会一直运行直到通知程序发出一些东西,到这里为止非常基本。

enum TimerResult = {
    COMPLETE,
    ABORTED,
    SKIPPED
};

_notifier: Subject<TimerResult> = new Subject();
notifier$: Observable<TimerResult> = this._notifier.asObservable();

simpleTimer$ = interval(1000);

startTimer(): Observable<number> <-- **here I want a timerResult** {
  return simpleTimer$.pipe(
      tap(()=>doSomethingBeautifulWhileRunning),
      takeUntil(this.notifier$)
   )

}

我想要实现的是获得通知程序发出的值作为结果。

我不需要中间值,我只需要知道它何时完成以及结果如何。


simpleTimer$.pipe(
   tap(()=>doSomethingBeautifulWhileRunning),
   last(),
   takeUntil(this.notifier$)
).subscribe((result)=>{
   // Here, of course, I get the last value 
   // I need instead the value coming from notifier$
});

我用 Rx 运算符尝试了很多可能的解决方案,但其中 none 按预期工作。我发现唯一一个产生可接受结果的(但恕我直言,非常非常脏)是这样的:

startTimer(): Observable<TimerResult>{
    simpleTimer$.pipe(...).subscribe(); <-- fire the timer, automatically unsubscribed by takeUntil
    return this.notifier$.pipe(first());
}

什么是最好的 "Rx" 获取方式? 我希望我已经足够清楚了,非常感谢任何帮助:)

您可以 merge 带有原始流的通知程序并从那里获取结果 试试下面的例子。

const notifier = timer(5000).pipe(mapTo("done"));

merge(interval(1000)
  .pipe(takeUntil(notifier)),
   notifier 
  )
  .subscribe(console.log,null,
   _=>console.log('complete') 
    );

我想你可以 merge:

simpleTimer$.pipe(
  tap(() => doSomethingBeautifulWhileRunning),
  last(),
  merge(this.notifier$.pipe(take(1)),
  takeUntil(this.notifier$),
).subscribe((result)=>{
   // Here, of course, I get the last value 
   // I need instead the value coming from notifier$
});

不过我不确定它是否会按此顺序工作。也许你必须切换 mergetakeUntil.

你也可以使用 combineLastest

  startTimer(): Observable<any> {
    const timer$= this.simpleTimer$.pipe(
      tap((res)=>console.log(res)),
      takeUntil(this.notifier$)
    );
    return combineLatest(timer$,this.notifier$)
  }

//and use:
this.startTimer().subscribe(([timer,action])=>{
  console.log(timer,action)
})

stackblitz