使用 redux-observable 实现背压
Backpressure implementation with redux-observable
我正在尝试在我的 React 应用程序中实现背压逻辑。
我发现一个很好的 post 关于这个 here 并试图将它添加到我的应用程序。
现在我有一些代码:
// epic.ts
import { ofType } from 'redux-observable';
import { mapTo, tap, delay, switchMap } from 'rxjs/operators';
import { createIteratorSubject } from './createIteratorSubject';
// fake generator which in real application is supposed to pull data from a server
function* generator() {
yield 1;
yield 2;
yield Promise.resolve(3);
yield Promise.resolve(4);
yield 5;
}
const iterator$ = createIteratorSubject(generator());
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value))
)
.subscribe({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
},
});
}),
mapTo((value: number) => {
return { type: 'PUT', payload: value };
})
);
}
// createIteratorSubject.ts
import { BehaviorSubject } from 'rxjs';
export function createIteratorSubject(iterator: any) {
const iterator$ = new BehaviorSubject();
const pushNextValue = async ({ done, value }: any) => {
if (done && value === undefined) {
iterator$.complete();
} else {
iterator$.next(await value);
}
};
iterator$.push = (value: any) => {
return pushNextValue(iterator.next(value));
};
iterator$.push();
return iterator$;
}
我面临的问题是我不知道如何将结果值分派给 redux。现在我有以下错误。
您在 switchMap
中返回一个订阅,其中需要 ObservableInput
。您可以像这样更改代码以使其工作:
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value)),
tap({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
}
})
);
}),
map((value: number) => {
return { type: 'PUT', payload: value };
})
);
}
我正在尝试在我的 React 应用程序中实现背压逻辑。 我发现一个很好的 post 关于这个 here 并试图将它添加到我的应用程序。 现在我有一些代码:
// epic.ts
import { ofType } from 'redux-observable';
import { mapTo, tap, delay, switchMap } from 'rxjs/operators';
import { createIteratorSubject } from './createIteratorSubject';
// fake generator which in real application is supposed to pull data from a server
function* generator() {
yield 1;
yield 2;
yield Promise.resolve(3);
yield Promise.resolve(4);
yield 5;
}
const iterator$ = createIteratorSubject(generator());
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value))
)
.subscribe({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
},
});
}),
mapTo((value: number) => {
return { type: 'PUT', payload: value };
})
);
}
// createIteratorSubject.ts
import { BehaviorSubject } from 'rxjs';
export function createIteratorSubject(iterator: any) {
const iterator$ = new BehaviorSubject();
const pushNextValue = async ({ done, value }: any) => {
if (done && value === undefined) {
iterator$.complete();
} else {
iterator$.next(await value);
}
};
iterator$.push = (value: any) => {
return pushNextValue(iterator.next(value));
};
iterator$.push();
return iterator$;
}
我面临的问题是我不知道如何将结果值分派给 redux。现在我有以下错误。
您在 switchMap
中返回一个订阅,其中需要 ObservableInput
。您可以像这样更改代码以使其工作:
export function epic(action$: any): any {
return action$.pipe(
ofType('TAKE'),
switchMap(() => {
return iterator$
.pipe(
tap((value) => console.info('INCOMING VALUE', value)),
delay(1000), // Some hard calculations here
tap((value) => console.info('DONE PROCESSING VALUE', value)),
tap({
next: iterator$.push,
complete: () => {
console.info('DONE PROCESSING ALL VALUES');
}
})
);
}),
map((value: number) => {
return { type: 'PUT', payload: value };
})
);
}