仅使用 RxJS 运算符缓存 Http 请求
Cache Http requests using only RxJS operators
我正在尝试实现此处描述的内容:https://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular
换句话说,我想在给定时间(比如 1 分钟)内缓存一个可观察对象。当在给定时间之后进行订阅时,应再次检索数据并再次缓存 1 分钟。
预期结果示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
shareReplay 运算符在给定时间缓存数据时工作正常,但我无法在给定时间过去后重新启动它。
使用 shareRelay(1, 1000) 运算符的示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response
上面的 link 尝试使用第一个运算符捕获空结果来更改该行为。不幸的是,它不能正常工作,因为在第一次之后没有缓存数据。
这是我使用上面的文章得到的link(下图是描述使用的代码)
我得到的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data
我也看过一些定时器运算符的例子,但在那种情况下,数据每分钟检索一次,即使没有订阅。我不想每分钟都刷新数据,我想每分钟都过期缓存。不幸的是,我丢失了定时器运算符的代码,但结果是这样的:
定时器运算符的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache
谁有 "pure" RxJS 解决方案来做我想做的事?
我认为您提供的解决方案 link 有一个小错误,正如我试图在此 StackBlitz 中强调的那样。 (或者我可能误解了这个想法)
你可以试试这个:
const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
switchMap(() => service.fetchData())
).pipe(share());
merge(
src$,
refetchData$
).pipe(
shareReplay(1, 1000),
buffer(concat(timer(0), refetchData$)),
tap(values => !values.length && refetchSbj.next()),
filter(values => values.length !== 0),
// In case there is only one value,
map(([v]) => v),
// Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
take(1)
)
shareReplay
内部使用 ReplaySubject
,它将所有缓存值 同步 发送给新订阅者。 timer(0)
类似于 setTimeout(fn, 0)
,但重要的方面是它是 异步的 ,它允许 buffer
收集 [=] 发出的值13=]。
buffer(concat(timer(0), refetchData$)),
- 我们要确保提供给 buffer
的内部可观察对象不会完成,否则整个流都会完成。 refetchData$
会在这种情况下发出新获取的数据(稍后我们会看到)。
tap(values => !values.length && refetchSbj.next())
- 如果没有发出任何值,则意味着正在使用的 ReplaySubject
没有任何值,这意味着时间已经过去。如果是这样,在 refetchSbj
的帮助下,我们可以重新填充缓存。
这就是我们可视化流程的方式:
T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.
T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
我会考虑以下策略。
首先你创建一个函数createCachedSource
,它returns一个Observable,它的缓存通过shareReplay(1)
实现。
然后我会使用这个函数来设置一个变量source$
,这是客户端必须用来订阅以获取数据的变量。
现在的诀窍是在所需的时间间隔重置 source$
,再次使用 createCachedSource
。
代码中的所有这些概念如下所示
// this function created an Observable cached via shareReplay(1)
const createCachedSource = () =>
of(1).pipe(
tap((d) => console.log(`Go out and fetch ${d}`)),
shareReplay(1),
tap((d) => console.log(`Return cached data ${d}`))
);
// source$ is the Observable the clients must subscribe to
let source$ = createCachedSource();
// every 1 sec reset the cache by creating a new Observable and setting it as value of source$
interval(1000)
.pipe(
take(10),
tap(() => (source$ = createCachedSource()))
)
.subscribe();
// here we simulate 30 subscriptions to source$, one every 300 mS
interval(300)
.pipe(
take(30),
switchMap(() => source$)
)
.subscribe();
Observable const shared$ = data$.pipe(shareReplay(1, 1000))
将缓存一个值 1000 毫秒。在那之后,它只会向未来的订阅者发送完整的通知(如果 data$
已完成)。通过检查 shared$
是否完成,您知道缓存已过期,您必须创建一个新的 shared$
Observable 供当前和未来的订阅者使用。
您可以使用更高阶的 Observable 来为您的订阅者提供那些 shared$
Observables。
const createShared = () => data$.pipe(shareReplay(1, 1000))
const sharedSupplier = new BehaviorSubject(createShared())
const cache = sharedSupplier.pipe(
concatMap(shared$ => shared$.pipe(
tap({ complete: () => sharedSupplier.next(createShared()) }),
)),
take(1) // emit once and complete so subscribers don't receive values from newly created shared Observables
)
https://stackblitz.com/edit/ggasua-2ntq7n
使用您发布的图像中的运算符,您还可以:
const cache = sharedSupplier.pipe(
concatMap(shared$ => shared$.pipe(
first(null, defer(() => (sharedSupplier.next(createShared()), EMPTY))),
mergeMap(d => isObservable(d) ? d : of(d))
)),
take(1)
)
但这是更多的代码,结果是一样的。
我正在尝试实现此处描述的内容:https://www.prestonlamb.com/blog/rxjs-cache-and-refresh-in-angular
换句话说,我想在给定时间(比如 1 分钟)内缓存一个可观察对象。当在给定时间之后进行订阅时,应再次检索数据并再次缓存 1 分钟。
预期结果示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
shareReplay 运算符在给定时间缓存数据时工作正常,但我无法在给定时间过去后重新启动它。
使用 shareRelay(1, 1000) 运算符的示例:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => no response
T 01:15: Request (6) => no response
T 01:30: Request (7) => no response
T 02:30: Request (8) => no response
上面的 link 尝试使用第一个运算符捕获空结果来更改该行为。不幸的是,它不能正常工作,因为在第一次之后没有缓存数据。
这是我使用上面的文章得到的link(下图是描述使用的代码)
我得到的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
T 01:15: Request (6) => RETRIEVE data
T 01:30: Request (7) => RETRIEVE data
T 02:30: Request (8) => RETRIEVE data
我也看过一些定时器运算符的例子,但在那种情况下,数据每分钟检索一次,即使没有订阅。我不想每分钟都刷新数据,我想每分钟都过期缓存。不幸的是,我丢失了定时器运算符的代码,但结果是这样的:
定时器运算符的结果:
T 00:00: Request (1) => RETRIEVE data
T 00:10: Request (2) => data from cache
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:00: NO REQUEST => RETRIEVE data
T 01:10: Request (5) => data from cache
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:00: NO REQUEST => RETRIEVE data
T 02:30: Request (8) => data from cache
谁有 "pure" RxJS 解决方案来做我想做的事?
我认为您提供的解决方案 link 有一个小错误,正如我试图在此 StackBlitz 中强调的那样。 (或者我可能误解了这个想法)
你可以试试这个:
const refetchSbj = new Subject();
const refetchData$ = refetchSbj.pipe(
switchMap(() => service.fetchData())
).pipe(share());
merge(
src$,
refetchData$
).pipe(
shareReplay(1, 1000),
buffer(concat(timer(0), refetchData$)),
tap(values => !values.length && refetchSbj.next()),
filter(values => values.length !== 0),
// In case there is only one value,
map(([v]) => v),
// Might want to add this, because each subscriber will receive the value emitted by the `shareReplay`
take(1)
)
shareReplay
内部使用 ReplaySubject
,它将所有缓存值 同步 发送给新订阅者。 timer(0)
类似于 setTimeout(fn, 0)
,但重要的方面是它是 异步的 ,它允许 buffer
收集 [=] 发出的值13=]。
buffer(concat(timer(0), refetchData$)),
- 我们要确保提供给 buffer
的内部可观察对象不会完成,否则整个流都会完成。 refetchData$
会在这种情况下发出新获取的数据(稍后我们会看到)。
tap(values => !values.length && refetchSbj.next())
- 如果没有发出任何值,则意味着正在使用的 ReplaySubject
没有任何值,这意味着时间已经过去。如果是这样,在 refetchSbj
的帮助下,我们可以重新填充缓存。
这就是我们可视化流程的方式:
T 00:00: Request (1) => RETRIEVE data
1) `refetchSbj.next()`
2) shareReplay will send the value resulted from `service.fetchData()` to the subscriber
3) the newly fetched value will be added to the `buffer`, and then the `refetchData$` from `concat(timer(0), refetchData$)` will emit(this is why we've used `share()`), meaning that `values` will not be an empty array
4) take(1) is reached, the value will be sent to the subscriber and then it will complete, so the `ReplaySubject` from `shareReplay()` will have no subscribers.
T 00:10: Request (2) => data from cache
`values` will not be empty, so `refetchSbj` won't emit and `take(1)` will be reached
T 00:35: Request (3) => data from cache
T 00:50: Request (4) => data from cache
T 01:10: Request (5) => RETRIEVE data
Same as `Request (1)`
T 01:15: Request (6) => data from cache
T 01:30: Request (7) => data from cache
T 02:30: Request (8) => RETRIEVE data
我会考虑以下策略。
首先你创建一个函数createCachedSource
,它returns一个Observable,它的缓存通过shareReplay(1)
实现。
然后我会使用这个函数来设置一个变量source$
,这是客户端必须用来订阅以获取数据的变量。
现在的诀窍是在所需的时间间隔重置 source$
,再次使用 createCachedSource
。
代码中的所有这些概念如下所示
// this function created an Observable cached via shareReplay(1)
const createCachedSource = () =>
of(1).pipe(
tap((d) => console.log(`Go out and fetch ${d}`)),
shareReplay(1),
tap((d) => console.log(`Return cached data ${d}`))
);
// source$ is the Observable the clients must subscribe to
let source$ = createCachedSource();
// every 1 sec reset the cache by creating a new Observable and setting it as value of source$
interval(1000)
.pipe(
take(10),
tap(() => (source$ = createCachedSource()))
)
.subscribe();
// here we simulate 30 subscriptions to source$, one every 300 mS
interval(300)
.pipe(
take(30),
switchMap(() => source$)
)
.subscribe();
Observable const shared$ = data$.pipe(shareReplay(1, 1000))
将缓存一个值 1000 毫秒。在那之后,它只会向未来的订阅者发送完整的通知(如果 data$
已完成)。通过检查 shared$
是否完成,您知道缓存已过期,您必须创建一个新的 shared$
Observable 供当前和未来的订阅者使用。
您可以使用更高阶的 Observable 来为您的订阅者提供那些 shared$
Observables。
const createShared = () => data$.pipe(shareReplay(1, 1000))
const sharedSupplier = new BehaviorSubject(createShared())
const cache = sharedSupplier.pipe(
concatMap(shared$ => shared$.pipe(
tap({ complete: () => sharedSupplier.next(createShared()) }),
)),
take(1) // emit once and complete so subscribers don't receive values from newly created shared Observables
)
https://stackblitz.com/edit/ggasua-2ntq7n
使用您发布的图像中的运算符,您还可以:
const cache = sharedSupplier.pipe(
concatMap(shared$ => shared$.pipe(
first(null, defer(() => (sharedSupplier.next(createShared()), EMPTY))),
mergeMap(d => isObservable(d) ? d : of(d))
)),
take(1)
)
但这是更多的代码,结果是一样的。