有条件地取消所有内部 observable 或切换到一个只发出一个值的新 observable

Conditionally cancel all inner observables or switch to a new one that emits only one value

我正在使用 Angular 开发一个应用程序,我正在尝试构建一个涉及多个可观察对象(发出多个值)和 HTTP 请求(发出一个值)的流程。

我想打开一个 InAppBrowser 让用户进行交易(支付)并读取回调,这是浏览器将尝试加载的新 url。我需要订阅浏览器的多个事件,以便在出现错误、加载新的 url 等时得到通知。

我想正确处理这个 RxJS 组合并在满足某些条件时取消所有 observable 以保证性能并且没有潜在的内存泄漏。

我已尝试关注:

  1. RxJS switchMap does not cancel inner merged observable
  2. Using SwitchMap() to Handle Canceling Previous Requests

为了避免复杂性,以下所有的 observables 和 request 都是假的。

发出一个值的 Observables:

getOrderId(): Observable<any> {
  return this.http.get<any>(
    'https://jsonplaceholder.typicode.com/posts/1'
  );
}

notifyBackend(url: string): Observable<any> {
  return this.http.get<any>(
    'https://jsonplaceholder.typicode.com/posts/1'
  ).pipe(delay(2000));
}

发出多个值的 Observable:

每一个都会随时发出,type 属性会告诉事件类型和发生了什么。

getMultipleOne(): Observable<any> {
  return of({type: 'exit'}).interval(3000);
}

getMultipleTwo(): Observable<any> {
  return of({type: 'loaderror', url: 'http://www.blabla.com'}).interval(2000);
}

getMultipleThree(): Observable<any> {
  // The url returned can be any, I'm waiting for mine.
  return of({type: 'loadstart', url: 'http://www.google.com'}).interval(1000);
}

我要完成的流程是:

  1. 调用 getOrderId 发出一个值并切换到另一个可观察值
  2. 同时监听三个observable的多个值。我将对这些事件做出反应并执行一些业务逻辑。
  3. 如果type退出,取消所有订阅,不通知来电者。意味着用户只需 closed/canceled 手动。
  4. 如果typeloadstart,检查url是否是我切换到另一个HTTP请求,或者,全部取消。
  5. 如果是我的url,调用后台保存结果并通知调用者显示一个不错的UI结果。

我试过:

initFlow() {
    return this.getOrderId() // Emit just one value
        .pipe(
            // Switch to another one that is the merge of 3 observables
            switchMap(() => {
                return merge(
                    getMultipleOne(),
                    getMultipleTwo(),
                    getMultipleThree()
                );
            }),
            // This tap will be executed every time the above observables emit something
            tap(event => console.log(event)),
            tap(event => {
                if (event.type === 'exit') {
                    // Cancel all previous observables if this happen?
                    // Since the flow is not completed, the caller must not be notified, I guess?
                }
            }),
            filter(event => event.type === 'loadstart'),
            pluck('url'),
            // If not meet, cancel all above without notifying the caller
            switchMap(url => this.isGoogle(url) ? EMPTY : of (url)),
            // Switch to another HTTP that emits one value or throw error to cancel all
            switchMap(url => {
                if (this.isMyPersonalWebSite(url)) {
                    return this.notifyBackend(url); // Emit just one value
                } else {
                    // Let the caller be notified by its error event to show UI
                    throw new Error('User transaction is failed');
                }
            }),
            // The previous HTTP just emit one value, this allow the caller to receive the completion event.
            // The caller does not need it, but, I guess is the right thing to happen in observables life.
            first(),
        );
}

来电者:

// This one will be unsubscribed later by the component when its onDestroy hook is called
this.myService.initFlow()
    .subscribe({
        next: res => console.log('next', res),
        error: e => console.log('error', e),
        complete: () => console.log('complete')
    });

我的问题:

  1. TODO部分,不知道怎么取消之前的所有 我使用 merge.
  2. 声明的可观察对象
  3. 当我returnEMPTY时,我什么也看不到,调用者也不是 收到 complete 事件。这正常吗?

我的目标:

  1. 如果满足条件,则取消所有发出多个值的 observable。
  2. notifyBackend() 发出时使用其 next 事件通知调用者,并在我手动抛出错误时使用其 error 事件在代码中。

任何避免内存泄漏的帮助或建议以及处理此问题的方法将不胜感激。

如果你想完成所有合并的 Observable,你需要在 siwtchMap() 中完成 merge()(我没有测试这段代码):

switchMap(() => {
  return merge(
    getMultipleOne(),
    getMultipleTwo(),
    getMultipleThree(),
  ).pipe(
    takeWhile(event => event.type !== 'exit'),
  );
}),
tap(event => {
  if (event === 'loaderror') {
    throw new Error(); // Will be wrapped into `error` notificaiton.
  }
}),
switchMap(event => {
  // At this point `event.type` must be `loadstart`
  if (this.isGoogle(event.url)) {
    return EMPTY;
  }

  if (this.isMyPersonalWebSite(event.url)) {
    return this.notifyBackend(event.url); // Emit just one value
  } else {
    // Let the caller be notified by its error event to show UI
    throw new Error('User transaction is failed');
  }
}),
take(1), // You shouldn't even need to use this

关于您的问题:

  • 要取消所有合并的Observable,需要在switchMap()内完成合并链(我这里用的是takeWhile

  • 在你的链中,你返回 EMPTY 但链以 first() 结束,这意味着当 this.isGoogle(url) === true 链将完成并且 first() 将发出错误。看到这个 .