在 Angular2 中使用 RxJs 5 为 REST 客户端提供基于时间的缓存
Time-based cache for REST client using RxJs 5 in Angular2
我是 ReactiveX/RxJs 的新手,我想知道我的用例是否适用于 RxJs,最好是使用内置运算符的组合。这是我想要实现的目标:
我有一个与 REST API 通信的 Angular2 应用程序。应用程序的不同部分需要在不同时间访问相同的信息。为了避免一遍又一遍地触发相同的请求来破坏服务器,我想添加客户端缓存。缓存应该发生在实际进行网络调用的服务层中。然后这个服务层只分发 Observable
s。缓存必须对应用程序的其余部分透明:它应该只知道 Observable
s,而不是缓存。
因此,最初,来自 REST API 的特定信息应该每(比方说)60 秒检索一次,即使在这 60 秒内有十几个组件从服务请求此信息.每个订阅者必须在订阅时从 Observable 获得(单个)最后一个值。
目前,我设法通过这样的方法实现了这一目标:
public getInformation(): Observable<Information> {
if (!this.information) {
this.information = this.restService.get('/information/')
.cache(1, 60000);
}
return this.information;
}
在此示例中,restService.get(...)
执行实际的网络调用,returns 执行 Observable
,很像 Angular 的 http 服务。
这种方法的问题是刷新缓存:虽然它确保网络调用恰好执行一次,并且缓存值在 60 秒后将不再推送给新订阅者,但它不会重新缓存过期后执行初始请求。因此,在 60 秒缓存之后发生的订阅将不会从 Observable
.
中获得任何值
是否可以在缓存超时后,如果有新的订阅,重新执行初始请求,并重新缓存新值60秒?
作为奖励:如果现有订阅(例如发起第一个网络呼叫的订阅者)将获得由较新订阅发起获取的刷新值,那就更酷了,这样一旦信息被刷新,它立即通过整个 Observable 感知应用程序传递。
要实现这一点,您需要在订阅时使用自定义逻辑创建自己的可观察对象:
function createTimedCache(doRequest, expireTime) {
let lastCallTime = 0;
let lastResult = null;
const result$ = new Rx.Subject();
return Rx.Observable.create(observer => {
const time = Date.now();
if (time - lastCallTime < expireTime) {
return (lastResult
// when result already received
? result$.startWith(lastResult)
// still waiting for result
: result$
).subscribe(observer);
}
const disposable = result$.subscribe(observer);
lastCallTime = time;
lastResult = null;
doRequest()
.do(result => {
lastResult = result;
})
.subscribe(v => result$.next(v), e => result$.error(e));
return disposable;
});
}
最终的用法如下:
this.information = createTimedCache(
() => this.restService.get('/information/'),
60000
);
我想出了一个解决方案来实现我正在寻找的东西。它可能违背 ReactiveX 命名法和最佳实践,但从技术上讲,它完全符合我的要求。话虽这么说,如果有人仍然找到一种方法来仅使用内置运算符实现相同的目的,我将很乐意接受更好的答案。
所以基本上因为我需要一种方法来在订阅时重新触发网络调用(没有轮询,没有计时器),我研究了 ReplaySubject
是如何实现的,甚至将它用作我的基础 class。然后我创建了一个基于回调的 class RefreshingReplaySubject
(欢迎改进命名!)。这是:
export class RefreshingReplaySubject<T> extends ReplaySubject<T> {
private providerCallback: () => Observable<T>;
private lastProviderTrigger: number;
private windowTime;
constructor(providerCallback: () => Observable<T>, windowTime?: number) {
// Cache exactly 1 item forever in the ReplaySubject
super(1);
this.windowTime = windowTime || 60000;
this.lastProviderTrigger = 0;
this.providerCallback = providerCallback;
}
protected _subscribe(subscriber: Subscriber<T>): Subscription {
// Hook into the subscribe method to trigger refreshing
this._triggerProviderIfRequired();
return super._subscribe(subscriber);
}
protected _triggerProviderIfRequired() {
let now = this._getNow();
if ((now - this.lastProviderTrigger) > this.windowTime) {
// Data considered stale, provider triggering required...
this.lastProviderTrigger = now;
this.providerCallback().first().subscribe((t: T) => this.next(t));
}
}
}
这是最终的用法:
public getInformation(): Observable<Information> {
if (!this.information) {
this.information = new RefreshingReplaySubject(
() => this.restService.get('/information/'),
60000
);
}
return this.information;
}
我是 ReactiveX/RxJs 的新手,我想知道我的用例是否适用于 RxJs,最好是使用内置运算符的组合。这是我想要实现的目标:
我有一个与 REST API 通信的 Angular2 应用程序。应用程序的不同部分需要在不同时间访问相同的信息。为了避免一遍又一遍地触发相同的请求来破坏服务器,我想添加客户端缓存。缓存应该发生在实际进行网络调用的服务层中。然后这个服务层只分发 Observable
s。缓存必须对应用程序的其余部分透明:它应该只知道 Observable
s,而不是缓存。
因此,最初,来自 REST API 的特定信息应该每(比方说)60 秒检索一次,即使在这 60 秒内有十几个组件从服务请求此信息.每个订阅者必须在订阅时从 Observable 获得(单个)最后一个值。
目前,我设法通过这样的方法实现了这一目标:
public getInformation(): Observable<Information> {
if (!this.information) {
this.information = this.restService.get('/information/')
.cache(1, 60000);
}
return this.information;
}
在此示例中,restService.get(...)
执行实际的网络调用,returns 执行 Observable
,很像 Angular 的 http 服务。
这种方法的问题是刷新缓存:虽然它确保网络调用恰好执行一次,并且缓存值在 60 秒后将不再推送给新订阅者,但它不会重新缓存过期后执行初始请求。因此,在 60 秒缓存之后发生的订阅将不会从 Observable
.
是否可以在缓存超时后,如果有新的订阅,重新执行初始请求,并重新缓存新值60秒?
作为奖励:如果现有订阅(例如发起第一个网络呼叫的订阅者)将获得由较新订阅发起获取的刷新值,那就更酷了,这样一旦信息被刷新,它立即通过整个 Observable 感知应用程序传递。
要实现这一点,您需要在订阅时使用自定义逻辑创建自己的可观察对象:
function createTimedCache(doRequest, expireTime) {
let lastCallTime = 0;
let lastResult = null;
const result$ = new Rx.Subject();
return Rx.Observable.create(observer => {
const time = Date.now();
if (time - lastCallTime < expireTime) {
return (lastResult
// when result already received
? result$.startWith(lastResult)
// still waiting for result
: result$
).subscribe(observer);
}
const disposable = result$.subscribe(observer);
lastCallTime = time;
lastResult = null;
doRequest()
.do(result => {
lastResult = result;
})
.subscribe(v => result$.next(v), e => result$.error(e));
return disposable;
});
}
最终的用法如下:
this.information = createTimedCache(
() => this.restService.get('/information/'),
60000
);
我想出了一个解决方案来实现我正在寻找的东西。它可能违背 ReactiveX 命名法和最佳实践,但从技术上讲,它完全符合我的要求。话虽这么说,如果有人仍然找到一种方法来仅使用内置运算符实现相同的目的,我将很乐意接受更好的答案。
所以基本上因为我需要一种方法来在订阅时重新触发网络调用(没有轮询,没有计时器),我研究了 ReplaySubject
是如何实现的,甚至将它用作我的基础 class。然后我创建了一个基于回调的 class RefreshingReplaySubject
(欢迎改进命名!)。这是:
export class RefreshingReplaySubject<T> extends ReplaySubject<T> {
private providerCallback: () => Observable<T>;
private lastProviderTrigger: number;
private windowTime;
constructor(providerCallback: () => Observable<T>, windowTime?: number) {
// Cache exactly 1 item forever in the ReplaySubject
super(1);
this.windowTime = windowTime || 60000;
this.lastProviderTrigger = 0;
this.providerCallback = providerCallback;
}
protected _subscribe(subscriber: Subscriber<T>): Subscription {
// Hook into the subscribe method to trigger refreshing
this._triggerProviderIfRequired();
return super._subscribe(subscriber);
}
protected _triggerProviderIfRequired() {
let now = this._getNow();
if ((now - this.lastProviderTrigger) > this.windowTime) {
// Data considered stale, provider triggering required...
this.lastProviderTrigger = now;
this.providerCallback().first().subscribe((t: T) => this.next(t));
}
}
}
这是最终的用法:
public getInformation(): Observable<Information> {
if (!this.information) {
this.information = new RefreshingReplaySubject(
() => this.restService.get('/information/'),
60000
);
}
return this.information;
}