RxJS 缓存并使用 shareReplay 刷新
RxJS cache and refresh with shareReplay
我正在对从 API 检索到的一些数据使用缓存,出于逻辑原因,存储的数据仅在有限的时间内有效,所以我正在使用类似的东西:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000))
对我来说似乎很明显但对 shareReplay
运算符的创建者来说显然不是的是,如果数据不再被缓存,则应该重新获取它,或者至少我应该有另一个参数可以给我这个选项,比如:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000, shouldRefresh))
相反,下一个订阅者将得到的是空值。
所以,我正在寻找一个优雅的解决方案来解决这个问题。
根据 documentation,shareReplay
运算符的 window
参数不是这样工作的:
the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers
在您的代码示例中,这意味着 3 秒后新订阅者将不会收到任何东西。
我认为最好的处理方法是使用外部计数器来处理它:
private cache$: Observable<any>;
private lastTime: number;
public getCachedData() {
if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
this.cache$ = this.getData().pipe(shareReplay(1));
this.lastTime = new Date().getTime();
}
return this.cache$;
}
每当新订阅者调用 getCachedData()
时,此代码将“重新创建”Observable。
但是,较旧的订阅者不会获得新的重新创建的 Observable 的更新。为了使所有这些保持同步,您可能需要使用 BehaviorSubject
来存储数据:
// Everybody subscribe to this Subject
private data$ = new BehaviorSubject(null);
public getCachedData() {
// TODO check time expiration here and call this.refreshData();
if(timeExpired) {
return this.refreshData().pipe(
mergeMap(data => {
return this.data$.asObservable();
})
);
} else {
return this.data$.asObservable();
}
}
private refreshData() {
return this.getData().pipe(
tap(data => {
this.data$.next(data);
})
);
}
以上方案只是一个思路,有待改进和测试。
这是一种方法:
const URL = 'https://jsonplaceholder.typicode.com/todos/1';
const notifier = new Subject();
const pending = new BehaviorSubject(false);
const cacheEmpty = Symbol('cache empty')
const shared$ = notifier.pipe(
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true),
fetch(URL).then(r => r.json())
)),
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
const src$ = shared$.pipe(
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
tap(v => v === cacheEmpty && notifier.next()),
filter(v => v !== cacheEmpty)
)
src$.subscribe(v => console.log('[1]', v));
setTimeout(() => {
src$.subscribe(v => console.log('[2]', v));
}, 500);
setTimeout(() => {
src$.subscribe(v => console.log('[3]', v));
}, 1200);
mergeWith
是 import { merge as mergeWith } from 'rxjs/operators'
(我认为从 RxJs 7 开始,它可以直接作为 mergeWith
访问)。
我的理由是我需要找到一种方法来确定正在使用的 ReplaySubject
的缓存是否为空。已知如果缓存不为空并且有新的订阅者到来,它将同步.
发送缓存值
所以,
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
与
基本相同
merge(
shared$,
of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)
如果缓存中有值,shared$
将发出并取消订阅 #2
。
如果没有值,#2
将发出并完成(它完成的事实不会影响外部可观察对象)。
接下来,我们看到如果 cacheEmpty
已经发出,那么我们知道是时候刷新数据了。
tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)
现在,让我们来看看notifier
是如何工作的
const shared$ = notifier.pipe(
// These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
// the source won't be subscribed more than needed
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
fetch(URL).then(r => r.json())
)),
// The request has finished, we have the new data
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
我有一个类似的用例,最终使用了以下自定义运算符。
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
export const cacheValue = <T>(windowTime: (value: T) => number) => (
source: Observable<T>,
) => {
let cache: { value: T; expires: number } | undefined = undefined;
return new Observable<T>((observer) => {
if (cache && cache.expires > Date.now()) {
observer.next(cache.value);
observer.complete();
} else {
return source
.pipe(
tap(
(value) =>
(cache = { value, expires: Date.now() + windowTime(value) }),
),
)
.subscribe(observer);
}
});
};
如果你的缓存在 100 毫秒后过期,你会称它为 cacheValue(() => 100)
,如果 API 返回的值有一个 expiresIn
属性,你' d 称它为 cacheValue((value) => value.expiresIn)
.
您可以更改 Stream 的启动方式,在本例中使用 interval
创建一个立即发出的流,然后在满足间隔时使用它来触发数据加载。
当您第一次订阅流时,触发间隔,加载数据,然后在三秒后再次触发。
import { interval } from 'rxjs';
const interval$ = interval(3000); // emits straight away then every 3 seconds
当 interval$
发出时,使用 switchMap
to switch the Observable
out and shareReplay
允许多播。
// previous import
import { switchMap, shareReplay } from 'rxjs/operators';
// previous code
const data$ = interval$.pipe(
switchMap(() => getData()),
shareReplay()
);
您还可以将 interval$ 包装在 merge
中,这样您就可以创建基于 Subject
的手动刷新,就像您的 interval
.
import { BehaviorSubject, merge, interval } from "rxjs";
import { shareReplay, switchMap } from "rxjs/operators";
const interval$ = interval(3000);
const reloadCacheSubject = new BehaviorSubject(null);
const data$ = merge(reloadCacheSubject, interval$).pipe(
switchMap(() => getData()),
shareReplay()
);
reloadCacheSubject.next(null); // causes a reload
StackBlitz merge
和 refreshCache
Subject
示例
在对这个线程的答案和网络上的一些其他方法进行了一些追踪之后,这就是我最终得到的。它提供了以下能力:
- 缓存值
- 如果数据不再缓存,则自动刷新值
- 直接与
Observable
合作
- 如果需要,指定缓存生命周期的持续时间
- 整理我的服务并提供可重复使用的解决方案
我的缓存工具:
export class SharedReplayRefresh {
private sharedReplay$: Observable<T>;
private subscriptionTime: number;
sharedReplayTimerRefresh(
source: Observable<T>, bufferSize: number = 1,
windowTime: number = 3000000, scheduler?: SchedulerLike): Observable<T> {
const currentTime = new Date().getTime();
if (!this.sharedReplay$ ||
currentTime - this.subscriptionTime > windowTime) {
this.sharedReplay$ = source.pipe(shareReplay(
bufferSize, windowTime, scheduler));
this.subscriptionTime = currentTime;
}
return this.sharedReplay$;
}
}
我的数据服务:
export class DataService {
constructor(private httpClient: HttpClient) { }
private dataSource =
new SharedReplayRefresh<Data>();
private source = this.httpClient.get<Data>(url);
get data$(): Observable<Data> {
return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
}
}
老问题,但我和你有同样的错误。意识到如果“api 数据”为空,我只需要回退到 api 调用。新代码以粗体显示。
someApiData$ = someApiData$ || this.getData().pipe(shareReplay(1, 3000))
我正在对从 API 检索到的一些数据使用缓存,出于逻辑原因,存储的数据仅在有限的时间内有效,所以我正在使用类似的东西:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000))
对我来说似乎很明显但对 shareReplay
运算符的创建者来说显然不是的是,如果数据不再被缓存,则应该重新获取它,或者至少我应该有另一个参数可以给我这个选项,比如:
someApiData$ = this.getData()
.pipe(shareReplay(1, 3000, shouldRefresh))
相反,下一个订阅者将得到的是空值。 所以,我正在寻找一个优雅的解决方案来解决这个问题。
根据 documentation,shareReplay
运算符的 window
参数不是这样工作的:
the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers
在您的代码示例中,这意味着 3 秒后新订阅者将不会收到任何东西。
我认为最好的处理方法是使用外部计数器来处理它:
private cache$: Observable<any>;
private lastTime: number;
public getCachedData() {
if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
this.cache$ = this.getData().pipe(shareReplay(1));
this.lastTime = new Date().getTime();
}
return this.cache$;
}
每当新订阅者调用 getCachedData()
时,此代码将“重新创建”Observable。
但是,较旧的订阅者不会获得新的重新创建的 Observable 的更新。为了使所有这些保持同步,您可能需要使用 BehaviorSubject
来存储数据:
// Everybody subscribe to this Subject
private data$ = new BehaviorSubject(null);
public getCachedData() {
// TODO check time expiration here and call this.refreshData();
if(timeExpired) {
return this.refreshData().pipe(
mergeMap(data => {
return this.data$.asObservable();
})
);
} else {
return this.data$.asObservable();
}
}
private refreshData() {
return this.getData().pipe(
tap(data => {
this.data$.next(data);
})
);
}
以上方案只是一个思路,有待改进和测试。
这是一种方法:
const URL = 'https://jsonplaceholder.typicode.com/todos/1';
const notifier = new Subject();
const pending = new BehaviorSubject(false);
const cacheEmpty = Symbol('cache empty')
const shared$ = notifier.pipe(
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true),
fetch(URL).then(r => r.json())
)),
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
const src$ = shared$.pipe(
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
tap(v => v === cacheEmpty && notifier.next()),
filter(v => v !== cacheEmpty)
)
src$.subscribe(v => console.log('[1]', v));
setTimeout(() => {
src$.subscribe(v => console.log('[2]', v));
}, 500);
setTimeout(() => {
src$.subscribe(v => console.log('[3]', v));
}, 1200);
mergeWith
是 import { merge as mergeWith } from 'rxjs/operators'
(我认为从 RxJs 7 开始,它可以直接作为 mergeWith
访问)。
我的理由是我需要找到一种方法来确定正在使用的 ReplaySubject
的缓存是否为空。已知如果缓存不为空并且有新的订阅者到来,它将同步.
所以,
mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
与
基本相同merge(
shared$,
of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)
如果缓存中有值,shared$
将发出并取消订阅 #2
。
如果没有值,#2
将发出并完成(它完成的事实不会影响外部可观察对象)。
接下来,我们看到如果 cacheEmpty
已经发出,那么我们知道是时候刷新数据了。
tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)
现在,让我们来看看notifier
是如何工作的
const shared$ = notifier.pipe(
// These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
// the source won't be subscribed more than needed
withLatestFrom(pending),
filter(([_, isPending]) => isPending === false),
switchMap(() => (
console.warn('[FETCHING DATA]'),
pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
fetch(URL).then(r => r.json())
)),
// The request has finished, we have the new data
tap(() => pending.next(false)),
shareReplay(1, 1000),
);
我有一个类似的用例,最终使用了以下自定义运算符。
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';
export const cacheValue = <T>(windowTime: (value: T) => number) => (
source: Observable<T>,
) => {
let cache: { value: T; expires: number } | undefined = undefined;
return new Observable<T>((observer) => {
if (cache && cache.expires > Date.now()) {
observer.next(cache.value);
observer.complete();
} else {
return source
.pipe(
tap(
(value) =>
(cache = { value, expires: Date.now() + windowTime(value) }),
),
)
.subscribe(observer);
}
});
};
如果你的缓存在 100 毫秒后过期,你会称它为 cacheValue(() => 100)
,如果 API 返回的值有一个 expiresIn
属性,你' d 称它为 cacheValue((value) => value.expiresIn)
.
您可以更改 Stream 的启动方式,在本例中使用 interval
创建一个立即发出的流,然后在满足间隔时使用它来触发数据加载。
当您第一次订阅流时,触发间隔,加载数据,然后在三秒后再次触发。
import { interval } from 'rxjs';
const interval$ = interval(3000); // emits straight away then every 3 seconds
当 interval$
发出时,使用 switchMap
to switch the Observable
out and shareReplay
允许多播。
// previous import
import { switchMap, shareReplay } from 'rxjs/operators';
// previous code
const data$ = interval$.pipe(
switchMap(() => getData()),
shareReplay()
);
您还可以将 interval$ 包装在 merge
中,这样您就可以创建基于 Subject
的手动刷新,就像您的 interval
.
import { BehaviorSubject, merge, interval } from "rxjs";
import { shareReplay, switchMap } from "rxjs/operators";
const interval$ = interval(3000);
const reloadCacheSubject = new BehaviorSubject(null);
const data$ = merge(reloadCacheSubject, interval$).pipe(
switchMap(() => getData()),
shareReplay()
);
reloadCacheSubject.next(null); // causes a reload
StackBlitz merge
和 refreshCache
Subject
在对这个线程的答案和网络上的一些其他方法进行了一些追踪之后,这就是我最终得到的。它提供了以下能力:
- 缓存值
- 如果数据不再缓存,则自动刷新值
- 直接与
Observable
合作
- 如果需要,指定缓存生命周期的持续时间
- 整理我的服务并提供可重复使用的解决方案
我的缓存工具:
export class SharedReplayRefresh {
private sharedReplay$: Observable<T>;
private subscriptionTime: number;
sharedReplayTimerRefresh(
source: Observable<T>, bufferSize: number = 1,
windowTime: number = 3000000, scheduler?: SchedulerLike): Observable<T> {
const currentTime = new Date().getTime();
if (!this.sharedReplay$ ||
currentTime - this.subscriptionTime > windowTime) {
this.sharedReplay$ = source.pipe(shareReplay(
bufferSize, windowTime, scheduler));
this.subscriptionTime = currentTime;
}
return this.sharedReplay$;
}
}
我的数据服务:
export class DataService {
constructor(private httpClient: HttpClient) { }
private dataSource =
new SharedReplayRefresh<Data>();
private source = this.httpClient.get<Data>(url);
get data$(): Observable<Data> {
return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
}
}
老问题,但我和你有同样的错误。意识到如果“api 数据”为空,我只需要回退到 api 调用。新代码以粗体显示。
someApiData$ = someApiData$ || this.getData().pipe(shareReplay(1, 3000))