RxJS:使用“switchMap”运算符时处理取消的事件
RxJS: Handle cancelled events when using `switchMap` operator
考虑以下片段
const { NEVER, timer } = rxjs;
const { catchError, switchMap, timeout } = rxjs.operators;
timer(0, 3000).pipe(
switchMap(() =>
timer(randomIntFromInterval(1, 4) * 1000).pipe( // <-- mock HTTP call
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
)
),
).subscribe({
next: (value) => console.log('Next triggred')
});
// credit:
function randomIntFromInterval(min, max) {
const value = Math.floor(Math.random() * (max - min + 1) + min);
console.log(`Simulated HTTP call ${value}s`);
return value;
}
.as-console-wrapper { max-height: 100% !important; top: 0px }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.4.0/rxjs.umd.min.js"></script>
此处 catchError
仅在 HTTP 调用发出错误时触发。但是,如果 HTTP 调用在 3 秒的轮询计时器内没有 return 任何内容,则先前的请求将在下一次调用之前被 取消 。我想对这些已取消的请求执行错误处理(本质上是触发 catchError
运算符)。
我知道我们可以通过管道输入 timeout
且阈值小于 3 秒以引发错误。但我想在不使用 timeout
运算符的情况下处理它。
谁能想出更好的解决方案? TIA.
我认为一种方法是结合使用 race
和 share
:
const src$ = timer(0, 3000).pipe(
share(),
);
src$.pipe(
// Making sure the subscriber created from `race` receives
// the notification first, so that the error can be properly thrown.
delay(0),
switchMap(() => race(
timer(randomIntFromInterval(1, 4) * 1000),
src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),
),
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
).subscribe({
next: (value) => console.log('Next triggred')
});
这是我的思路:
通过使用 share
,我实际上是在告诉 RxJS 在 timer
之间放置一个 Subject
实例(我们现在将其称为 S
)和后续订阅者。这意味着所有最初会转到 timer
的订阅者将转而转到 Subject
。这是为了确保可以有尽可能多的订阅者,而不是每次都重新创建一个新的计时器。现在,让我们看看为什么我们必须考虑多个订阅者。
当计时器第一次发出 0
时,S
将有一个订阅者。然后,race
开始发挥作用:
race(
timer(randomIntFromInterval(1, 4) * 1000),
src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),
这些是赛车手:实际请求和计时器的下一次通知。让我们看看可能的情况:
- 请求在 3 秒前完成;在这种情况下,
race
将完成它的工作并将取消订阅 loser,在这种情况下,observable 会抛出自定义错误
- 3 秒后请求没有完成;在这种情况下,获胜者是
src$.pipe(...)
,这意味着我们将得到一个错误,该错误将被来自 主管道 的 catchError
运算符捕获;这里也很重要的是要强调 delay(0)
的用法——确保首先抛出错误,然后取消订阅 switchMap
的内部可观察对象;在这种情况下,S
将获得 2 个订阅者 - 一个来自 主管道 ,另一个是使用 race
的结果,因此通过使用delay(0)
,第一个收到timer
通知的订阅者是race
的订阅者
我可以建议稍微不同的方法:您可以跟踪此类情况并应用您需要的逻辑,而不是抛出错误
这是执行此操作的操作员:
function switchMapWithOvertakeEvent<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
onOvertake: (value: T) => void
): OperatorFunction<T, R> {
let awaitingResponse = false;
return (src$) =>
src$.pipe(
tap((v) => {
if (awaitingResponse) {
onOvertake(v);
}
awaitingResponse = true;
}),
switchMap(project),
tap(() => (awaitingResponse = false))
);
}
它可以与您的示例一起使用,如下所示
timer(0, 3000)
.pipe(
switchMapWithOvertakeEvent(
() =>
timer(randomIntFromInterval(1, 10) * 1000).pipe(
// <-- mock HTTP call
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
),
() => console.log('http call cancelled')
)
)
.subscribe({
next: (value) => console.log('Next triggred'),
complete: () => console.log('complete'),
});
// credit:
function randomIntFromInterval(min, max) {
const value = Math.floor(Math.random() * (max - min + 1) + min);
console.log(`Simulated HTTP call ${value}s`);
return value;
}
你可以在这里玩代码https://stackblitz.com/edit/mjemgq?devtoolsheight=50
考虑以下片段
const { NEVER, timer } = rxjs;
const { catchError, switchMap, timeout } = rxjs.operators;
timer(0, 3000).pipe(
switchMap(() =>
timer(randomIntFromInterval(1, 4) * 1000).pipe( // <-- mock HTTP call
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
)
),
).subscribe({
next: (value) => console.log('Next triggred')
});
// credit:
function randomIntFromInterval(min, max) {
const value = Math.floor(Math.random() * (max - min + 1) + min);
console.log(`Simulated HTTP call ${value}s`);
return value;
}
.as-console-wrapper { max-height: 100% !important; top: 0px }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.4.0/rxjs.umd.min.js"></script>
此处 catchError
仅在 HTTP 调用发出错误时触发。但是,如果 HTTP 调用在 3 秒的轮询计时器内没有 return 任何内容,则先前的请求将在下一次调用之前被 取消 。我想对这些已取消的请求执行错误处理(本质上是触发 catchError
运算符)。
我知道我们可以通过管道输入 timeout
且阈值小于 3 秒以引发错误。但我想在不使用 timeout
运算符的情况下处理它。
谁能想出更好的解决方案? TIA.
我认为一种方法是结合使用 race
和 share
:
const src$ = timer(0, 3000).pipe(
share(),
);
src$.pipe(
// Making sure the subscriber created from `race` receives
// the notification first, so that the error can be properly thrown.
delay(0),
switchMap(() => race(
timer(randomIntFromInterval(1, 4) * 1000),
src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),
),
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
).subscribe({
next: (value) => console.log('Next triggred')
});
这是我的思路:
通过使用 share
,我实际上是在告诉 RxJS 在 timer
之间放置一个 Subject
实例(我们现在将其称为 S
)和后续订阅者。这意味着所有最初会转到 timer
的订阅者将转而转到 Subject
。这是为了确保可以有尽可能多的订阅者,而不是每次都重新创建一个新的计时器。现在,让我们看看为什么我们必须考虑多个订阅者。
当计时器第一次发出 0
时,S
将有一个订阅者。然后,race
开始发挥作用:
race(
timer(randomIntFromInterval(1, 4) * 1000),
src$.pipe(switchMapTo(throwError('Request took longer than 3 seconds!'))),
),
这些是赛车手:实际请求和计时器的下一次通知。让我们看看可能的情况:
- 请求在 3 秒前完成;在这种情况下,
race
将完成它的工作并将取消订阅 loser,在这种情况下,observable 会抛出自定义错误 - 3 秒后请求没有完成;在这种情况下,获胜者是
src$.pipe(...)
,这意味着我们将得到一个错误,该错误将被来自 主管道 的catchError
运算符捕获;这里也很重要的是要强调delay(0)
的用法——确保首先抛出错误,然后取消订阅switchMap
的内部可观察对象;在这种情况下,S
将获得 2 个订阅者 - 一个来自 主管道 ,另一个是使用race
的结果,因此通过使用delay(0)
,第一个收到timer
通知的订阅者是race
的订阅者
我可以建议稍微不同的方法:您可以跟踪此类情况并应用您需要的逻辑,而不是抛出错误
这是执行此操作的操作员:
function switchMapWithOvertakeEvent<T, R>(
project: (value: T, index: number) => ObservableInput<R>,
onOvertake: (value: T) => void
): OperatorFunction<T, R> {
let awaitingResponse = false;
return (src$) =>
src$.pipe(
tap((v) => {
if (awaitingResponse) {
onOvertake(v);
}
awaitingResponse = true;
}),
switchMap(project),
tap(() => (awaitingResponse = false))
);
}
它可以与您的示例一起使用,如下所示
timer(0, 3000)
.pipe(
switchMapWithOvertakeEvent(
() =>
timer(randomIntFromInterval(1, 10) * 1000).pipe(
// <-- mock HTTP call
catchError(() => {
// do something on error
console.log('Caught error');
return NEVER;
})
),
() => console.log('http call cancelled')
)
)
.subscribe({
next: (value) => console.log('Next triggred'),
complete: () => console.log('complete'),
});
// credit:
function randomIntFromInterval(min, max) {
const value = Math.floor(Math.random() * (max - min + 1) + min);
console.log(`Simulated HTTP call ${value}s`);
return value;
}
你可以在这里玩代码https://stackblitz.com/edit/mjemgq?devtoolsheight=50