从一个 observable 读取值,但一旦它(另一个)发出就切换到另一个

Read values from one observable but switch to another once it (the other one) emits

我想要的行为:

  1. 运行 HTTP 请求
  2. 立即在异步缓存中查找数据
  3. 如果缓存在 HTTP 发出之前有值 - 使用缓存值。
  4. 在 HTTP 值终于出现后使用它。
  5. 如果 HTTP 响应速度比缓存快 - 忽略缓存。

所以基本上我想启动两个异步进程,其中一个应该快速提供一个值,但如果它不提供 - 我只想使用来自较慢的 observable 的值,它无论如何都优先。

扩展我的评论:问题是并行触发两个 observable 并利用第一个发射,即使另一个 observable 尚未发射。

通常您可以使用 merge 函数。

但是您有一个条件(“如果 HTTP 响应速度快于缓存 - 忽略缓存。”merge 函数或任何函数本身都无法满足标准 RxJS 运算符。

但是很容易从现有的运算符在 RxJS 中编写自定义运算符。对于您的情况,您可以自定义 filter operator to suit your needs. See here 以简要介绍如何编写自定义运算符。

export const filterLateCache = () => {
  let serverEmitted = false;
  return <T>(source: Observable<T>) => {
    return source.pipe(
      filter((data: any) => {
        if (!!data.server) {
          serverEmitted = true;
          return true;
        } else if (!!data.cache) {
          if (serverEmitted) {
            return false;
          } else {
            return true;
          }
        } else {
          return false;
        }
      })
    );
  };
};

如您所见,检查传入通知中的布尔标志 servercache 以决定是否必须发出该值。因此,您需要使用 map 运算符将来自可观察对象的值附加到这些标志中。

merge(
  server$.pipe(
    map((value) => ({
      server: true,
      value: value,
    }))
  ),
  cache$.pipe(
    map((value) => ({
      cache: true,
      value: value,
    }))
  )
)
  .pipe(filterLateCache())
  .subscribe({
    next: ({ value }) => {       // <-- utilize destructuring to ignore boolean flags
      // handle response here
    },
    error: (error: any) => {
      // handle errors
    }
  });

工作示例:Stackblitz

也许 raceWith 值得一看:https://rxjs-dev.firebaseapp.com/api/operators/raceWith

基本上它看起来像:

server$.pipe(raceWith(cache$)).subscribe(/*side effect that must be done*/);

缺少的是它不满足要求 4。