在 Angular2 中使用 RxJs 5 为 REST 客户端提供基于时间的缓存

Time-based cache for REST client using RxJs 5 in Angular2

我是 ReactiveX/RxJs 的新手,我想知道我的用例是否适用于 RxJs,最好是使用内置运算符的组合。这是我想要实现的目标:

我有一个与 REST API 通信的 Angular2 应用程序。应用程序的不同部分需要在不同时间访问相同的信息。为了避免一遍又一遍地触发相同的请求来破坏服务器,我想添加客户端缓存。缓存应该发生在实际进行网络调用的服务层中。然后这个服务层只分发 Observables。缓存必须对应用程序的其余部分透明:它应该只知道 Observables,而不是缓存。

因此,最初,来自 REST API 的特定信息应该每(比方说)60 秒检索一次,即使在这 60 秒内有十几个组件从服务请求此信息.每个订阅者必须在订阅时从 Observable 获得(单个)最后一个值。

目前,我设法通过这样的方法实现了这一目标:

public getInformation(): Observable<Information> {
  if (!this.information) {
    this.information = this.restService.get('/information/')
      .cache(1, 60000);
  }
  return this.information;
}

在此示例中,restService.get(...) 执行实际的网络调用,returns 执行 Observable,很像 Angular 的 http 服务。

这种方法的问题是刷新缓存:虽然它确保网络调用恰好执行一次,并且缓存值在 60​​ 秒后将不再推送给新订阅者,但它不会重新缓存过期后执行初始请求。因此,在 60 秒缓存之后发生的订阅将不会从 Observable.

中获得任何值

是否可以在缓存超时后,如果有新的订阅,重新执行初始请求,并重新缓存新值60秒?

作为奖励:如果现有订阅(例如发起第一个网络呼叫的订阅者)将获得由较新订阅发起获取的刷新值,那就更酷了,这样一旦信息被刷新,它立即通过整个 Observable 感知应用程序传递。

要实现这一点,您需要在订阅时使用自定义逻辑创建自己的可观察对象:

function createTimedCache(doRequest, expireTime) {
    let lastCallTime = 0;
    let lastResult = null;

    const result$ = new Rx.Subject();

    return Rx.Observable.create(observer => {
        const time = Date.now();
        if (time - lastCallTime < expireTime) {
            return (lastResult
                // when result already received
                ? result$.startWith(lastResult)
                // still waiting for result
                : result$
            ).subscribe(observer);
        }
        const disposable = result$.subscribe(observer);
        lastCallTime = time;
        lastResult = null;
        doRequest()
            .do(result => {
                lastResult = result;
            })
            .subscribe(v => result$.next(v), e => result$.error(e));
        return disposable;
    });
}

最终的用法如下:

this.information = createTimedCache(
  () => this.restService.get('/information/'),
  60000
);

用法示例:https://jsbin.com/hutikesoqa/edit?js,console

我想出了一个解决方案来实现我正在寻找的东西。它可能违背 ReactiveX 命名法和最佳实践,但从技术上讲,它完全符合我的要求。话虽这么说,如果有人仍然找到一种方法来仅使用内置运算符实现相同的目的,我将很乐意接受更好的答案。

所以基本上因为我需要一种方法来在订阅时重新触发网络调用(没有轮询,没有计时器),我研究了 ReplaySubject 是如何实现的,甚至将它用作我的基础 class。然后我创建了一个基于回调的 class RefreshingReplaySubject(欢迎改进命名!)。这是:

export class RefreshingReplaySubject<T> extends ReplaySubject<T> {

  private providerCallback: () => Observable<T>;
  private lastProviderTrigger: number;
  private windowTime;

  constructor(providerCallback: () => Observable<T>, windowTime?: number) {
    // Cache exactly 1 item forever in the ReplaySubject
    super(1);
    this.windowTime = windowTime || 60000;
    this.lastProviderTrigger = 0;
    this.providerCallback = providerCallback;
  }

  protected _subscribe(subscriber: Subscriber<T>): Subscription {
    // Hook into the subscribe method to trigger refreshing
    this._triggerProviderIfRequired();
    return super._subscribe(subscriber);
  }

  protected _triggerProviderIfRequired() {
    let now = this._getNow();
    if ((now - this.lastProviderTrigger) > this.windowTime) {
      // Data considered stale, provider triggering required...
      this.lastProviderTrigger = now;
      this.providerCallback().first().subscribe((t: T) => this.next(t));
    }
  }
}

这是最终的用法:

public getInformation(): Observable<Information> {
  if (!this.information) {
    this.information = new RefreshingReplaySubject(
      () => this.restService.get('/information/'),
      60000
    );
  }
  return this.information;
}