从一个 observable 读取值,但一旦它(另一个)发出就切换到另一个
Read values from one observable but switch to another once it (the other one) emits
我想要的行为:
- 运行 HTTP 请求
- 立即在异步缓存中查找数据
- 如果缓存在 HTTP 发出之前有值 - 使用缓存值。
- 在 HTTP 值终于出现后使用它。
- 如果 HTTP 响应速度比缓存快 - 忽略缓存。
所以基本上我想启动两个异步进程,其中一个应该快速提供一个值,但如果它不提供 - 我只想使用来自较慢的 observable 的值,它无论如何都优先。
扩展我的评论:问题是并行触发两个 observable 并利用第一个发射,即使另一个 observable 尚未发射。
通常您可以使用 merge
函数。
但是您有一个条件(“如果 HTTP 响应速度快于缓存 - 忽略缓存。”)merge
函数或任何函数本身都无法满足标准 RxJS 运算符。
但是很容易从现有的运算符在 RxJS 中编写自定义运算符。对于您的情况,您可以自定义 filter
operator to suit your needs. See here 以简要介绍如何编写自定义运算符。
export const filterLateCache = () => {
let serverEmitted = false;
return <T>(source: Observable<T>) => {
return source.pipe(
filter((data: any) => {
if (!!data.server) {
serverEmitted = true;
return true;
} else if (!!data.cache) {
if (serverEmitted) {
return false;
} else {
return true;
}
} else {
return false;
}
})
);
};
};
如您所见,检查传入通知中的布尔标志 server
和 cache
以决定是否必须发出该值。因此,您需要使用 map
运算符将来自可观察对象的值附加到这些标志中。
merge(
server$.pipe(
map((value) => ({
server: true,
value: value,
}))
),
cache$.pipe(
map((value) => ({
cache: true,
value: value,
}))
)
)
.pipe(filterLateCache())
.subscribe({
next: ({ value }) => { // <-- utilize destructuring to ignore boolean flags
// handle response here
},
error: (error: any) => {
// handle errors
}
});
工作示例:Stackblitz
也许 raceWith
值得一看:https://rxjs-dev.firebaseapp.com/api/operators/raceWith
基本上它看起来像:
server$.pipe(raceWith(cache$)).subscribe(/*side effect that must be done*/);
缺少的是它不满足要求 4。
我想要的行为:
- 运行 HTTP 请求
- 立即在异步缓存中查找数据
- 如果缓存在 HTTP 发出之前有值 - 使用缓存值。
- 在 HTTP 值终于出现后使用它。
- 如果 HTTP 响应速度比缓存快 - 忽略缓存。
所以基本上我想启动两个异步进程,其中一个应该快速提供一个值,但如果它不提供 - 我只想使用来自较慢的 observable 的值,它无论如何都优先。
扩展我的评论:问题是并行触发两个 observable 并利用第一个发射,即使另一个 observable 尚未发射。
通常您可以使用 merge
函数。
但是您有一个条件(“如果 HTTP 响应速度快于缓存 - 忽略缓存。”)merge
函数或任何函数本身都无法满足标准 RxJS 运算符。
但是很容易从现有的运算符在 RxJS 中编写自定义运算符。对于您的情况,您可以自定义 filter
operator to suit your needs. See here 以简要介绍如何编写自定义运算符。
export const filterLateCache = () => {
let serverEmitted = false;
return <T>(source: Observable<T>) => {
return source.pipe(
filter((data: any) => {
if (!!data.server) {
serverEmitted = true;
return true;
} else if (!!data.cache) {
if (serverEmitted) {
return false;
} else {
return true;
}
} else {
return false;
}
})
);
};
};
如您所见,检查传入通知中的布尔标志 server
和 cache
以决定是否必须发出该值。因此,您需要使用 map
运算符将来自可观察对象的值附加到这些标志中。
merge(
server$.pipe(
map((value) => ({
server: true,
value: value,
}))
),
cache$.pipe(
map((value) => ({
cache: true,
value: value,
}))
)
)
.pipe(filterLateCache())
.subscribe({
next: ({ value }) => { // <-- utilize destructuring to ignore boolean flags
// handle response here
},
error: (error: any) => {
// handle errors
}
});
工作示例:Stackblitz
也许 raceWith
值得一看:https://rxjs-dev.firebaseapp.com/api/operators/raceWith
基本上它看起来像:
server$.pipe(raceWith(cache$)).subscribe(/*side effect that must be done*/);
缺少的是它不满足要求 4。