RxJS:使用“switchMap”运算符时处理取消的事件

RxJS: Handle cancelled events when using `switchMap` operator

考虑以下片段

const { NEVER, timer } = rxjs;
const { catchError, switchMap, timeout } = rxjs.operators;

timer(0, 3000).pipe(
  switchMap(() => 
    timer(randomIntFromInterval(1, 4) * 1000).pipe(  // <-- mock HTTP call
      catchError(() => {
        // do something on error
        console.log('Caught error');
        return NEVER;
      })
    )
  ),
).subscribe({
  next: (value) => console.log('Next triggred')
});

// credit: 
function randomIntFromInterval(min, max) {
  const value = Math.floor(Math.random() * (max - min + 1) + min);
  console.log(`Simulated HTTP call ${value}s`);
  return value;
}
.as-console-wrapper { max-height: 100% !important; top: 0px }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.4.0/rxjs.umd.min.js"></script>

此处 catchError 仅在 HTTP 调用发出错误时触发。但是,如果 HTTP 调用在 3 秒的轮询计时器内没有 return 任何内容,则先前的请求将在下一次调用之前被 取消 。我想对这些已取消的请求执行错误处理(本质上是触发 catchError 运算符)。

我知道我们可以通过管道输入 timeout 且阈值小于 3 秒以引发错误。但我想在不使用 timeout 运算符的情况下处理它。

谁能想出更好的解决方案? TIA.

我认为一种方法是结合使用 raceshare:

const src$ = timer(0, 3000).pipe(
  share(),
);

src$.pipe(
  // Making sure the subscriber created from `race` receives
  // the notification first, so that the error can be properly thrown.
  delay(0),

  switchMap(() => race(
      timer(randomIntFromInterval(1, 4) * 1000),
      src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
    ),
  ),
  catchError(() => {
    // do something on error
    console.log('Caught error');
    return NEVER;
  })
).subscribe({
  next: (value) => console.log('Next triggred')
});

这是我的思路:

通过使用 share,我实际上是在告诉 RxJS 在 timer 之间放置一个 Subject 实例(我们现在将其称为 S)和后续订阅者。这意味着所有最初会转到 timer 的订阅者将转而转到 Subject。这是为了确保可以有尽可能多的订阅者,而不是每次都重新创建一个新的计时器。现在,让我们看看为什么我们必须考虑多个订阅者。

当计时器第一次发出 0 时,S 将有一个订阅者。然后,race 开始发挥作用:

race(
  timer(randomIntFromInterval(1, 4) * 1000),
  src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),

这些是赛车手:实际请求和计时器的下一次通知。让我们看看可能的情况:

  • 请求在 3 秒前完成;在这种情况下,race 将完成它的工作并将取消订阅 loser,在这种情况下,observable 会抛出自定义错误
  • 3 秒后请求没有完成;在这种情况下,获胜者是 src$.pipe(...),这意味着我们将得到一个错误,该错误将被来自 主管道 catchError 运算符捕获;这里也很重要的是要强调 delay(0) 的用法——确保首先抛出错误,然后取消订阅 switchMap 的内部可观察对象;在这种情况下,S 将获得 2 个订阅者 - 一个来自 主管道 ,另一个是使用 race 的结果,因此通过使用delay(0),第一个收到timer通知的订阅者是race的订阅者

我可以建议稍微不同的方法:您可以跟踪此类情况并应用您需要的逻辑,而不是抛出错误

这是执行此操作的操作员:

function switchMapWithOvertakeEvent<T, R>(
  project: (value: T, index: number) => ObservableInput<R>,
  onOvertake: (value: T) => void
): OperatorFunction<T, R> {
  let awaitingResponse = false;
  return (src$) =>
    src$.pipe(
      tap((v) => {
        if (awaitingResponse) {
          onOvertake(v);
        }
        awaitingResponse = true;
      }),
      switchMap(project),
      tap(() => (awaitingResponse = false))
    );
}

它可以与您的示例一起使用,如下所示

timer(0, 3000)
  .pipe(
    switchMapWithOvertakeEvent(
      () =>
        timer(randomIntFromInterval(1, 10) * 1000).pipe(
          // <-- mock HTTP call
          catchError(() => {
            // do something on error
            console.log('Caught error');
            return NEVER;
          })
        ),
      () => console.log('http call cancelled')
    )
  )
  .subscribe({
    next: (value) => console.log('Next triggred'),
    complete: () => console.log('complete'),
  });

// credit: 
function randomIntFromInterval(min, max) {
  const value = Math.floor(Math.random() * (max - min + 1) + min);
  console.log(`Simulated HTTP call ${value}s`);
  return value;
}

你可以在这里玩代码https://stackblitz.com/edit/mjemgq?devtoolsheight=50