如何使用 takeUntil 的多个条件来停止订阅

How to stop subscription by using multiple conditions with takeUntil

我想根据两个条件停止可观察订阅:

发生了什么:

它只是根据时间停止执行(使用import { timer } from 'rxjs/internal/observable/timer';

这是我当前的代码:

出于示例目的更改了属性、变量的名称及其值:

import { finalize } from 'rxjs/internal/operators/finalize';
import { interval } from 'rxjs/internal/observable/interval';
import { timer } from 'rxjs/internal/observable/timer';
import { takeUntil, first } from 'rxjs/operators';
import { merge, EMPTY, of } from 'rxjs';

.
. // Attributes and Class declaration here
.


async startProcess(): Promise<void> {

    this.isProcessLoading = true;

    const { someId } = await this.exampleService.execute().toPromise();

    const interval$ = interval(1000);
    const timeLimiter$ = timer(10000);

    const request$ = this.exampleService.doRequest();
    
    const isEventFinished$ = EMPTY;

    // My goal here is for the result of this function to return an observable that notifies 
    // if the first parameter emits an event OR if the second parameter emits another. That is, I want to notify if any condition is valid
    const stopConditions$ = merge(isEventFinished$, timeLimiter$);

    const handleSuccess = (object: MyType) => {

      if (object.status === 'FINALIZED') {

        this.object = object;
        isEventFinished$.subscribe();
      }
    };

    const handleError = () =>  this.showErrorComponent = true;

    interval$
    .pipe(takeUntil(stopConditions$))
    .pipe(finalize(() => this.isSimulationLoading = false))
    .subscribe(() => request$.subscribe(handleSuccess, handleError));
}

代码“有效”,因为 timeLimiter$ 在 10 秒后触发 takeUntil。但是,我希望在时间限制之前停止的可能性...

我希望 takeUntil 也能够从这里 运行:

isEventFinished$.subscribe()

如果上面的代码片段正确执行,它应该停止 interval$,但它没有。那是我的问题

我已经尝试过的:

  1. 我不知道两根管子与只使用一根这样的管子是否有区别:.pipe(takeUntil(stopConditions$), finalize(() => this.isSimulationLoading = false))。但是,我已经尝试过了,但没有用

  2. 已尝试将此 isEventFinished$ 替换为:const isEventFinished$ = of(1) 以及他的订阅者:timeLimiter$.pipe(first()).subscribe()。 但这也不起作用。实际上,这会阻止请求被执行(我不知道为什么)

我刚刚用 Stackblitz 尝试了这段代码,它“有效”了……但我不确定你到底想做什么?然后我做了一些更新以更好地了解发生了什么。

在此处查看 Stackblitz:https://stackblitz.com/edit/angular-takeuntil-deborahk

关键变化:

const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
  tap(s => console.log("stop", s))
);

interval$.pipe(takeUntil(stopConditions$)).subscribe({
  next: handleSuccess,
  error: handleError,
  complete: () => {
    this.isSimulationLoading = false;
    console.log("isSimulationLoading", this.isSimulationLoading)
  }
});

有帮助吗?

编辑:我添加了一个“完成”按钮来模拟导致完成操作的任何操作。

通过将其声明为 Subject 或 BehaviorSubject 将 isEventFinished 定义为 Observable(BehaviorSubject 有默认值,Subject 没有)。

  isEventFinished$ = new Subject<boolean>();

然后,每当 finished 事件发生时,使用 next 方法将一个值发送到 isEventFinished 流中。

this.isEventFinished$.next(true);

那么这段代码应该可以工作:

const stopConditions$ = merge(this.isEventFinished$, timeLimiter$).pipe(
  tap(s => console.log("stop", s))
);

interval$.pipe(takeUntil(stopConditions$)).subscribe({
  next: handleSuccess,
  error: handleError,
  complete: () => {
    this.isSimulationLoading = false;
    console.log("isSimulationLoading", this.isSimulationLoading);
  }
});

查看更新的闪电战。

这样行吗?

您还可以像这样在 takeuntil 中使用 && 运算符:

takeUntil(this.firstCondition$ && this.secondCondition$)