如何使用 RxJs 创建一个简单的缓存

How to create a simple cache using RxJs

我想使用 RxJs 创建缓存服务。想法很简单,我有一个 class 可以从服务器初始化中获取一些数据,还有一个 websocket 可以将通知推送到该服务(添加/编辑/删除实体)。

下面的代码是一种简化,让我明白如何实现我需要的。

const source = of(['World', 'test', 'azerty', 'uiop']);
const add$ = new Subject<string>();
const update$ = new Subject<string>();
const delete$ = new Subject<string>();

所以,真正的问题是:我如何创建一个高阶可观察对象,它会对来自 add$、update$ 和 delete$ 的任何更新作出反应?

我想要一个变量 'cache$',它是一个由源代码组成的可观察变量,添加更新和删除。并且最初由源流式传输的数据填充(这将是一个可观察到的 http 调用)。

你可以像这里那样做

const update$ = merge([
  add$.pipe(map(addItem => items => items.concat([addItem]))),
  delete$.pipe(map(deleteItem => items => items.filter(item => item !== addItem))),
  update$.pipe(map(updateItem => items => items.map(item => /* not very clear how exactly you want to update */))),
])
const cache$ = source.pipe(
  switchMap((startItems) => update$.pipe(scan((prevItems, changerFn) => changerFn(prevItems), startItems))),
  publish(),
);
cache$.connect(); 

好的,我想我找到了解决办法

import { combineLatest, Observable, of, Subject } from 'rxjs'; 
import { map, startWith } from 'rxjs/operators';

interface ITestPayload {
  id: number,
  name?: string,
}

interface ICacheEvent {
  eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
  payload: ITestPayload,
}

const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const event$ = new Subject<ICacheEvent>();

let cache$ = combineLatest([
  source,
  event$.pipe(startWith({eventType: 'INIT', payload: null})),
]).pipe(
  map(([source, event]) => {
  if (event.eventType === 'ADD'){
      const index = source.map(x => x.id).indexOf(event.payload.id);

      if(index === -1) {
        source.push(event.payload);
      }
    }
    else if (event.eventType === 'DELETE'){
      const index = source.map(x => x.id).indexOf(event.payload.id);

      if(index >= 0) {
        source.splice(index, 1);
      }
    }
    else if (event.eventType === 'EDIT'){
      const index = source.map(x => x.id).indexOf(event.payload.id);

      if(index >= 0) {
        source[index] = event.payload;
      }
    }

    return source;
  })
)

cache$.subscribe(x => console.log(x.map(item => item.name)));

event$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
event$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
event$.next({eventType: "DELETE", payload: {id: 1}});

另一种解决方案(我的一位同事提出的)

import { combineLatest, merge,  Observable, of, Subject } from 'rxjs'; 
import { map, scan,  shareReplay,  startWith } from 'rxjs/operators';

interface ITestPayload {
  id: number,
  name?: string,
}

interface ICacheEvent {
  eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
  payload: ITestPayload,
}

const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const eventSubject$ = new Subject<ICacheEvent>();
const event$ = eventSubject$.asObservable();

const cache$: Observable<Array<ITestPayload>> = merge(
  source,
  event$,
).pipe(
  scan((acc: ITestPayload[], value: ICacheEvent) => {
    console.log('scan');
    if (value.eventType === 'ADD'){
      const index = acc.map(x => x.id).indexOf(value.payload.id);

      if(index === -1) {
        acc.push(value.payload);
      }
    }
    else if (value.eventType === 'DELETE'){
      const index = acc.map(x => x.id).indexOf(value.payload.id);

      if(index >= 0) {
        acc.splice(index, 1);
      }
    }
    else if (value.eventType === 'EDIT'){
      const index = acc.map(x => x.id).indexOf(value.payload.id);

      if(index >= 0) {
        acc[index] = value.payload;
      }
    }

    return acc;
  }))

cache$.subscribe(x => console.log('sub 0 :', x.map(item => item.name)));

eventSubject$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
eventSubject$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
eventSubject$.next({eventType: "DELETE", payload: {id: 1}});