仅使用 RxJS 运算符缓存 Http 请求

Cache Http requests using only RxJS operators

我正在尝试实现此处描述的内容:https://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular

换句话说,我想在给定时间(比如 1 分钟)内缓存一个可观察对象。当在给定时间之后进行订阅时,应再次检索数据并再次缓存 1 分钟。

预期结果示例:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data

shareReplay 运算符在给定时间缓存数据时工作正常,但我无法在给定时间过去后重新启动它。

使用 shareRelay(1, 1000) 运算符的示例:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response

上面的 link 尝试使用第一个运算符捕获空结果来更改该行为。不幸的是,它不能正常工作,因为在第一次之后没有缓存数据。

这是我使用上面的文章得到的link(下图是描述使用的代码)

我得到的结果:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data

我也看过一些定时器运算符的例子,但在那种情况下,数据每分钟检索一次,即使没有订阅。我不想每分钟都刷新数据,我想每分钟都过期缓存。不幸的是,我丢失了定时器运算符的代码,但结果是这样的:

定时器运算符的结果:

T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache

谁有 "pure" RxJS 解决方案来做我想做的事?

我认为您提供的解决方案 link 有一个小错误,正如我试图在此 StackBlitz 中强调的那样。 (或者我可能误解了这个想法)

你可以试试这个:

const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
    switchMap(() => service.fetchData())
  ).pipe(share());

merge(
  src$,
  refetchData$
).pipe(
  shareReplay(1, 1000),
  buffer(concat(timer(0), refetchData$)),
  tap(values => !values.length && refetchSbj.next()),
  filter(values => values.length !== 0),
  // In case there is only one value,
  map(([v]) => v),
  // Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
  take(1)
)

shareReplay 内部使用 ReplaySubject,它将所有缓存值 同步 发送给新订阅者。 timer(0) 类似于 setTimeout(fn, 0),但重要的方面是它是 异步的 ,它允许 buffer 收集 [=] 发出的值13=]。

buffer(concat(timer(0), refetchData$)), - 我们要确保提供给 buffer 的内部可观察对象不会完成,否则整个流都会完成。 refetchData$ 会在这种情况下发出新获取的数据(稍后我们会看到)。

tap(values => !values.length && refetchSbj.next()) - 如果没有发出任何值,则意味着正在使用的 ReplaySubject 没有任何值,这意味着时间已经过去。如果是这样,在 refetchSbj 的帮助下,我们可以重新填充缓存。


这就是我们可视化流程的方式:

T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.

T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached

T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data

我会考虑以下策略。

首先你创建一个函数createCachedSource,它returns一个Observable,它的缓存通过shareReplay(1)实现。

然后我会使用这个函数来设置一个变量source$,这是客户端必须用来订阅以获取数据的变量。

现在的诀窍是在所需的时间间隔重置 source$,再次使用 createCachedSource

代码中的所有这些概念如下所示

// this function created an Observable cached via shareReplay(1)
const createCachedSource = () =>
  of(1).pipe(
    tap((d) => console.log(`Go out and fetch ${d}`)),
    shareReplay(1),
    tap((d) => console.log(`Return cached data ${d}`))
  );

// source$ is the Observable the clients must subscribe to
let source$ = createCachedSource();

// every 1 sec reset the cache by creating a new Observable and setting it as value of source$
interval(1000)
  .pipe(
    take(10),
    tap(() => (source$ = createCachedSource()))
  )
  .subscribe();

// here we simulate 30 subscriptions to source$, one every 300 mS
interval(300)
  .pipe(
    take(30),
    switchMap(() => source$)
  )
  .subscribe();

Observable const shared$ = data$.pipe(shareReplay(1, 1000)) 将缓存一个值 1000 毫秒。在那之后,它只会向未来的订阅者发送完整的通知(如果 data$ 已完成)。通过检查 shared$ 是否完成,您知道缓存已过期,您必须创建一个新的 shared$ Observable 供当前和未来的订阅者使用。

您可以使用更高阶的 Observable 来为您的订阅者提供那些 shared$ Observables。

const createShared = () => data$.pipe(shareReplay(1, 1000))
const sharedSupplier = new BehaviorSubject(createShared())

const cache = sharedSupplier.pipe(
  concatMap(shared$ => shared$.pipe(
    tap({ complete: () => sharedSupplier.next(createShared()) }),
  )),
  take(1) // emit once and complete so subscribers don't receive values from newly created shared Observables
)

https://stackblitz.com/edit/ggasua-2ntq7n


使用您发布的图像中的运算符,您还可以:

const cache = sharedSupplier.pipe(
  concatMap(shared$ => shared$.pipe(
    first(null, defer(() => (sharedSupplier.next(createShared()), EMPTY))),
    mergeMap(d => isObservable(d) ? d : of(d))
  )),
  take(1)
)

但这是更多的代码,结果是一样的。