RxJS 缓存并使用 shareReplay 刷新

RxJS cache and refresh with shareReplay

我正在对从 API 检索到的一些数据使用缓存,出于逻辑原因,存储的数据仅在有限的时间内有效,所以我正在使用类似的东西:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000))

对我来说似乎很明显但对 shareReplay 运算符的创建者来说显然不是的是,如果数据不再被缓存,则应该重新获取它,或者至少我应该有另一个参数可以给我这个选项,比如:

someApiData$ = this.getData()
    .pipe(shareReplay(1, 3000, shouldRefresh))

相反,下一个订阅者将得到的是空值。 所以,我正在寻找一个优雅的解决方案来解决这个问题。

根据 documentationshareReplay 运算符的 window 参数不是这样工作的:

the age, in milliseconds, at which items in this buffer may be discarded without being emitted to subsequent observers

在您的代码示例中,这意味着 3 秒后新订阅者将不会收到任何东西。

我认为最好的处理方法是使用外部计数器来处理它:

  private cache$: Observable<any>;
  private lastTime: number;

  public getCachedData() {
    if (!this.cache$ || new Date().getTime() - this.lastTime > 3000) {
      this.cache$ = this.getData().pipe(shareReplay(1));
      this.lastTime = new Date().getTime();
    }

    return this.cache$;
  }

每当新订阅者调用 getCachedData() 时,此代码将“重新创建”Observable。

但是,较旧的订阅者不会获得新的重新创建的 Observable 的更新。为了使所有这些保持同步,您可能需要使用 BehaviorSubject 来存储数据:

  // Everybody subscribe to this Subject
  private data$ = new BehaviorSubject(null);

  public getCachedData() {
    // TODO check time expiration here and call this.refreshData();
    if(timeExpired) {
      return this.refreshData().pipe(
        mergeMap(data => {
          return this.data$.asObservable();
        })
      );
    } else {
      return this.data$.asObservable();
    }
  }
  
  private refreshData() {
    return this.getData().pipe(
      tap(data => {
        this.data$.next(data);
      })
    );
  }

以上方案只是一个思路,有待改进和测试。

这是一种方法:

const URL = 'https://jsonplaceholder.typicode.com/todos/1';

const notifier = new Subject();
const pending = new BehaviorSubject(false);

const cacheEmpty = Symbol('cache empty')

const shared$ = notifier.pipe(
  withLatestFrom(pending),
  filter(([_, isPending]) => isPending === false),

  switchMap(() => (
    console.warn('[FETCHING DATA]'),
    pending.next(true),
    fetch(URL).then(r => r.json())
  )),

  tap(() => pending.next(false)),
  shareReplay(1, 1000),
);

const src$ = shared$.pipe(
  mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),
  tap(v => v === cacheEmpty && notifier.next()),
  filter(v => v !== cacheEmpty)
)

src$.subscribe(v => console.log('[1]', v));

setTimeout(() => {
  src$.subscribe(v => console.log('[2]', v));
}, 500);

setTimeout(() => {
  src$.subscribe(v => console.log('[3]', v));
}, 1200);

StackBlitz.

mergeWithimport { merge as mergeWith } from 'rxjs/operators'(我认为从 RxJs 7 开始,它可以直接作为 mergeWith 访问)。

我的理由是我需要找到一种方法来确定正在使用的 ReplaySubject 的缓存是否为空。已知如果缓存不为空并且有新的订阅者到来,它将同步.

发送缓存值

所以,

mergeWith(of(cacheEmpty).pipe(delay(0), takeUntil(shared$))),

基本相同
merge(
  shared$,
  of(cacheEmpty).pipe(delay(0), takeUntil(shared$)) // #2
)

如果缓存中有值,shared$ 将发出并取消订阅 #2

如果没有值,#2 将发出并完成(它完成的事实不会影响外部可观察对象)。

接下来,我们看到如果 cacheEmpty 已经发出,那么我们知道是时候刷新数据了。

tap(v => v === cacheEmpty && notifier.next()), // `notifier.next()` -> time to refresh
filter(v => v !== cacheEmpty)

现在,让我们来看看notifier是如何工作的

const shared$ = notifier.pipe(
  
  // These 2 operators + `pending` make sure that if 2 subscribers register one after the other, thus synchronously
  // the source won't be subscribed more than needed
  withLatestFrom(pending),
  filter(([_, isPending]) => isPending === false),

  switchMap(() => (
    console.warn('[FETCHING DATA]'),
    pending.next(true), // If a new subscriber registers while the request is pending, the source won't be requested twice
    fetch(URL).then(r => r.json())
  )),

  // The request has finished, we have the new data
  tap(() => pending.next(false)),

  shareReplay(1, 1000),
);

我有一个类似的用例,最终使用了以下自定义运算符。

import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

export const cacheValue = <T>(windowTime: (value: T) => number) => (
  source: Observable<T>,
) => {
  let cache: { value: T; expires: number } | undefined = undefined;
  return new Observable<T>((observer) => {
    if (cache && cache.expires > Date.now()) {
      observer.next(cache.value);
      observer.complete();
    } else {
      return source
        .pipe(
          tap(
            (value) =>
              (cache = { value, expires: Date.now() + windowTime(value) }),
          ),
        )
        .subscribe(observer);
    }
  });
};

如果你的缓存在 100 毫秒后过期,你会称它为 cacheValue(() => 100),如果 API 返回的值有一个 expiresIn 属性,你' d 称它为 cacheValue((value) => value.expiresIn).

您可以更改 Stream 的启动方式,在本例中使用 interval 创建一个立即发出的流,然后在满足间隔时使用它来触发数据加载。

当您第一次订阅流时,触发间隔,加载数据,然后在三秒后再次触发。

import { interval } from 'rxjs';

const interval$ = interval(3000); // emits straight away then every 3 seconds

interval$ 发出时,使用 switchMap to switch the Observable out and shareReplay 允许多播。

// previous import
import { switchMap, shareReplay } from 'rxjs/operators';

// previous code

const data$ = interval$.pipe(
    switchMap(() => getData()),
    shareReplay()
);

您还可以将 interval$ 包装在 merge 中,这样您就可以创建基于 Subject 的手动刷新,就像您的 interval.

import { BehaviorSubject, merge, interval } from "rxjs";
import { shareReplay, switchMap } from "rxjs/operators";

const interval$ = interval(3000);
const reloadCacheSubject = new BehaviorSubject(null);

const data$ = merge(reloadCacheSubject, interval$).pipe(
  switchMap(() => getData()),
  shareReplay()
);

reloadCacheSubject.next(null); // causes a reload

StackBlitz mergerefreshCache Subject

示例

在对这个线程的答案和网络上的一些其他方法进行了一些追踪之后,这就是我最终得到的。它提供了以下能力:

  1. 缓存值
  2. 如果数据不再缓存,则自动刷新值
  3. 直接与 Observable
  4. 合作
  5. 如果需要,指定缓存生命周期的持续时间
  6. 整理我的服务并提供可重复使用的解决方案

我的缓存工具:

export class SharedReplayRefresh {

    private sharedReplay$: Observable<T>;
    private subscriptionTime: number;


   sharedReplayTimerRefresh(
       source: Observable<T>, bufferSize: number = 1,
       windowTime: number = 3000000,  scheduler?: SchedulerLike): Observable<T> {

        const currentTime = new Date().getTime();
        if (!this.sharedReplay$ || 
            currentTime - this.subscriptionTime > windowTime) {
            this.sharedReplay$ = source.pipe(shareReplay(
                bufferSize, windowTime, scheduler));
            this.subscriptionTime = currentTime;
        }

        return this.sharedReplay$;
    }
}

我的数据服务:

export class DataService {
    
    constructor(private httpClient: HttpClient) { }

    private dataSource = 
        new SharedReplayRefresh<Data>();
    private source = this.httpClient.get<Data>(url);
    
    get data$(): Observable<Data> {
        return this.dataSource .sharedReplayTimerRefresh(this.source, 1, 1500);
    }
}

老问题,但我和你有同样的错误。意识到如果“api 数据”为空,我只需要回退到 api 调用。新代码以粗体显示。

someApiData$ = someApiData$ || this.getData().pipe(shareReplay(1, 3000))