如何使用 RxJs 创建一个简单的缓存
How to create a simple cache using RxJs
我想使用 RxJs 创建缓存服务。想法很简单,我有一个 class 可以从服务器初始化中获取一些数据,还有一个 websocket 可以将通知推送到该服务(添加/编辑/删除实体)。
下面的代码是一种简化,让我明白如何实现我需要的。
const source = of(['World', 'test', 'azerty', 'uiop']);
const add$ = new Subject<string>();
const update$ = new Subject<string>();
const delete$ = new Subject<string>();
所以,真正的问题是:我如何创建一个高阶可观察对象,它会对来自 add$、update$ 和 delete$ 的任何更新作出反应?
我想要一个变量 'cache$',它是一个由源代码组成的可观察变量,添加更新和删除。并且最初由源流式传输的数据填充(这将是一个可观察到的 http 调用)。
你可以像这里那样做
const update$ = merge([
add$.pipe(map(addItem => items => items.concat([addItem]))),
delete$.pipe(map(deleteItem => items => items.filter(item => item !== addItem))),
update$.pipe(map(updateItem => items => items.map(item => /* not very clear how exactly you want to update */))),
])
const cache$ = source.pipe(
switchMap((startItems) => update$.pipe(scan((prevItems, changerFn) => changerFn(prevItems), startItems))),
publish(),
);
cache$.connect();
好的,我想我找到了解决办法
import { combineLatest, Observable, of, Subject } from 'rxjs';
import { map, startWith } from 'rxjs/operators';
interface ITestPayload {
id: number,
name?: string,
}
interface ICacheEvent {
eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
payload: ITestPayload,
}
const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const event$ = new Subject<ICacheEvent>();
let cache$ = combineLatest([
source,
event$.pipe(startWith({eventType: 'INIT', payload: null})),
]).pipe(
map(([source, event]) => {
if (event.eventType === 'ADD'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index === -1) {
source.push(event.payload);
}
}
else if (event.eventType === 'DELETE'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index >= 0) {
source.splice(index, 1);
}
}
else if (event.eventType === 'EDIT'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index >= 0) {
source[index] = event.payload;
}
}
return source;
})
)
cache$.subscribe(x => console.log(x.map(item => item.name)));
event$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
event$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
event$.next({eventType: "DELETE", payload: {id: 1}});
另一种解决方案(我的一位同事提出的)
import { combineLatest, merge, Observable, of, Subject } from 'rxjs';
import { map, scan, shareReplay, startWith } from 'rxjs/operators';
interface ITestPayload {
id: number,
name?: string,
}
interface ICacheEvent {
eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
payload: ITestPayload,
}
const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const eventSubject$ = new Subject<ICacheEvent>();
const event$ = eventSubject$.asObservable();
const cache$: Observable<Array<ITestPayload>> = merge(
source,
event$,
).pipe(
scan((acc: ITestPayload[], value: ICacheEvent) => {
console.log('scan');
if (value.eventType === 'ADD'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index === -1) {
acc.push(value.payload);
}
}
else if (value.eventType === 'DELETE'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index >= 0) {
acc.splice(index, 1);
}
}
else if (value.eventType === 'EDIT'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index >= 0) {
acc[index] = value.payload;
}
}
return acc;
}))
cache$.subscribe(x => console.log('sub 0 :', x.map(item => item.name)));
eventSubject$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
eventSubject$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
eventSubject$.next({eventType: "DELETE", payload: {id: 1}});
我想使用 RxJs 创建缓存服务。想法很简单,我有一个 class 可以从服务器初始化中获取一些数据,还有一个 websocket 可以将通知推送到该服务(添加/编辑/删除实体)。
下面的代码是一种简化,让我明白如何实现我需要的。
const source = of(['World', 'test', 'azerty', 'uiop']);
const add$ = new Subject<string>();
const update$ = new Subject<string>();
const delete$ = new Subject<string>();
所以,真正的问题是:我如何创建一个高阶可观察对象,它会对来自 add$、update$ 和 delete$ 的任何更新作出反应?
我想要一个变量 'cache$',它是一个由源代码组成的可观察变量,添加更新和删除。并且最初由源流式传输的数据填充(这将是一个可观察到的 http 调用)。
你可以像这里那样做
const update$ = merge([
add$.pipe(map(addItem => items => items.concat([addItem]))),
delete$.pipe(map(deleteItem => items => items.filter(item => item !== addItem))),
update$.pipe(map(updateItem => items => items.map(item => /* not very clear how exactly you want to update */))),
])
const cache$ = source.pipe(
switchMap((startItems) => update$.pipe(scan((prevItems, changerFn) => changerFn(prevItems), startItems))),
publish(),
);
cache$.connect();
好的,我想我找到了解决办法
import { combineLatest, Observable, of, Subject } from 'rxjs';
import { map, startWith } from 'rxjs/operators';
interface ITestPayload {
id: number,
name?: string,
}
interface ICacheEvent {
eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
payload: ITestPayload,
}
const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const event$ = new Subject<ICacheEvent>();
let cache$ = combineLatest([
source,
event$.pipe(startWith({eventType: 'INIT', payload: null})),
]).pipe(
map(([source, event]) => {
if (event.eventType === 'ADD'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index === -1) {
source.push(event.payload);
}
}
else if (event.eventType === 'DELETE'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index >= 0) {
source.splice(index, 1);
}
}
else if (event.eventType === 'EDIT'){
const index = source.map(x => x.id).indexOf(event.payload.id);
if(index >= 0) {
source[index] = event.payload;
}
}
return source;
})
)
cache$.subscribe(x => console.log(x.map(item => item.name)));
event$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
event$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
event$.next({eventType: "DELETE", payload: {id: 1}});
另一种解决方案(我的一位同事提出的)
import { combineLatest, merge, Observable, of, Subject } from 'rxjs';
import { map, scan, shareReplay, startWith } from 'rxjs/operators';
interface ITestPayload {
id: number,
name?: string,
}
interface ICacheEvent {
eventType: 'INIT' | 'ADD' | 'EDIT' | 'DELETE',
payload: ITestPayload,
}
const source: Observable<Array<ITestPayload>> = of([{id: 1, name: 'World'}, {id: 2, name: 'test'}, {id: 3, name: 'azerty'}]);
const eventSubject$ = new Subject<ICacheEvent>();
const event$ = eventSubject$.asObservable();
const cache$: Observable<Array<ITestPayload>> = merge(
source,
event$,
).pipe(
scan((acc: ITestPayload[], value: ICacheEvent) => {
console.log('scan');
if (value.eventType === 'ADD'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index === -1) {
acc.push(value.payload);
}
}
else if (value.eventType === 'DELETE'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index >= 0) {
acc.splice(index, 1);
}
}
else if (value.eventType === 'EDIT'){
const index = acc.map(x => x.id).indexOf(value.payload.id);
if(index >= 0) {
acc[index] = value.payload;
}
}
return acc;
}))
cache$.subscribe(x => console.log('sub 0 :', x.map(item => item.name)));
eventSubject$.next({eventType: "ADD", payload: {id: 4, name: 'test 4'}})
eventSubject$.next({eventType: "EDIT", payload: {id: 2, name: 'value edited'}});
eventSubject$.next({eventType: "DELETE", payload: {id: 1}});