我如何在 RxJS/redux-observable 和 redux-saga 中对动作进行排序?

How do I sequence actions in RxJS/redux-observable vs redux-saga?

我已经开始深入学习 RxJs,原因之一是掌握 redux-observable 副作用方法,但我发现 sagas 更方便 "declarative"。我已经学习了 merge/flat/concat/switchMap 运算符,但它并没有帮助我弄清楚如何在 rxjs 中对事物进行排序。

这是我所说的 "sequencing" 的示例,在定时器应用程序的实例中,可以在一段时间后安排启动,使用 redux-saga:

实现
export function* timerSaga() {
  while (true) {
    yield take('START');

    const { startDelay } = yield select(); // scheduled delay

    const [cancelled] = yield race([
      take('CANCEL_START'),
      delay(startDelay)
    ]);

    if (!cancelled) {
      yield race([
        call(function*() {
           while (true) {
             yield delay(10);
             yield put({ type: 'TICK' });
           }
        }),
        take(['STOP', 'RESET']
      ]);
    }
  }
}

我发现这个例子在逻辑上非常一致和清晰。我不知道如何使用 redux-observable 来实现它。拜托,简单地给我重现相同逻辑但使用 rxjs 运算符的代码。

我假设 take() return 是可观察的,还没有测试代码。它可能会像下面这样转换为 rx 时尚。

这里的关键是repeat()takeUntil()

// outter condition for starting ticker
forkJoin(take('START'), select())
    .pipe(
        switchMap(([, startDelay]) =>
            // inner looping ticker
            timer(10).pipe(switchMap(_ => put({type: 'TICK'})), repeat(),
                takeUntil(race(
                    take('CANCEL_START'),
                    delay(startDelay)
                ))
            )
            /////////////////////
        )
    )

在 sagas(生成器)和 epics(可观察对象)之间,改变很重要 您考虑事件如何到达您的代码的方式。

生成器满足迭代器和可迭代协议,其中涉及拉取 values/events(在本例中为 Redux 操作)来自源代码,并进行阻塞 执行直到这些事件到达。

Observables 是 push 而不是 pull。我们描述并命名事件流 我们感兴趣的,然后我们订阅它们。没有阻塞 调用,因为我们所有的代码都是在事件发生时由事件触发的。

这段代码重复了 saga 示例中的行为。

import { interval, timer } from 'rxjs';
import { withLatestFrom, mapTo, exhaustMap, takeUntil } from 'rxjs/operators';
import { ofType } from 'redux-observable';

const myEpic = (action$, state$) => {
  // A stream of all the "cancel start" actions
  const cancelStart$ = action$.pipe(ofType('CANCEL_START'));

  // This observable will emit delayed start events that are not cancelled.
  const delayedCancellableStarts$ = action$.pipe(
    // When a start action occurs...
    ofType('START'), 

    // Grab the latest start delay value from state...
    withLatestFrom(state$, (_, { startDelay }) => startDelay),

    exhaustMap(
      // ...and emit an event after our delay, unless our cancel stream
      // emits first, then do nothing until the next start event arrives.

      // exhaustMap means we ignore all other start events while we handle
      // this one.
      (startDelay) => timer(startDelay).pipe(takeUntil(cancelStart$))
    )
  );

  // On subscribe, emit a tick action every 10ms
  const tick$ = interval(10).pipe(mapTo({ type: 'TICK' }));

  // On subscribe, emit only STOP or RESET actions
  const stopTick$ = action$.pipe(ofType('STOP', 'RESET'));

  // When a start event arrives, start ticking until we get a message to
  // stop. Ignore all start events until we stop ticking.
  return delayedCancellableStarts$.pipe(
    exhaustMap(() => tick$.pipe(takeUntil(stopTick$)))
  );
};

重要的是,尽管我们正在创建和命名这些可观察流,但它们的行为是惰性的 - none 其中 'activated' 直到被订阅,当您提供这个史诗般的功能时就会发生这种情况到 redux-observable 中间件。