使用 redux-observable 实现背压

Backpressure implementation with redux-observable

我正在尝试在我的 React 应用程序中实现背压逻辑。 我发现一个很好的 post 关于这个 here 并试图将它添加到我的应用程序。 现在我有一些代码:

// epic.ts
import { ofType } from 'redux-observable';
import { mapTo, tap, delay, switchMap } from 'rxjs/operators';
import { createIteratorSubject } from './createIteratorSubject';

// fake generator which in real application is supposed to pull data from a server
function* generator() {
  yield 1;
  yield 2;
  yield Promise.resolve(3);
  yield Promise.resolve(4);
  yield 5;
}

const iterator$ = createIteratorSubject(generator());

export function epic(action$: any): any {
  return action$.pipe(
    ofType('TAKE'),
    switchMap(() => {
      return iterator$
        .pipe(
          tap((value) => console.info('INCOMING VALUE', value)),
          delay(1000), // Some hard calculations here
          tap((value) => console.info('DONE PROCESSING VALUE', value))
        )
        .subscribe({
          next: iterator$.push,
          complete: () => {
            console.info('DONE PROCESSING ALL VALUES');
          },
        });
    }),
    mapTo((value: number) => {
      return { type: 'PUT', payload: value };
    })
  );
}
// createIteratorSubject.ts
import { BehaviorSubject } from 'rxjs';

export function createIteratorSubject(iterator: any) {
  const iterator$ = new BehaviorSubject();

  const pushNextValue = async ({ done, value }: any) => {
    if (done && value === undefined) {
      iterator$.complete();
    } else {
      iterator$.next(await value);
    }
  };

  iterator$.push = (value: any) => {
    return pushNextValue(iterator.next(value));
  };

  iterator$.push();

  return iterator$;
}

我面临的问题是我不知道如何将结果值分派给 redux。现在我有以下错误。

您在 switchMap 中返回一个订阅,其中需要 ObservableInput。您可以像这样更改代码以使其工作:

export function epic(action$: any): any {
  return action$.pipe(
    ofType('TAKE'),
    switchMap(() => {
      return iterator$
        .pipe(
          tap((value) => console.info('INCOMING VALUE', value)),
          delay(1000), // Some hard calculations here
          tap((value) => console.info('DONE PROCESSING VALUE', value)),
          tap({
            next: iterator$.push,
            complete: () => {
              console.info('DONE PROCESSING ALL VALUES');
            }
          })
        );
    }),
    map((value: number) => {
      return { type: 'PUT', payload: value };
    })
  );
}