如何在 redux-observable 中重新初始化动作?

How to reinit actions in redux-observable?

例如,这段代码 this jsbin example:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000) // Asynchronously wait 1000ms then continue
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));

当我如上所述使用 takeUntil 时,在调度 CANCEL 动作并延迟 1 秒后,该动作再也不会触发。为什么?

问题是对 RxJS 工作原理的微妙但严重的误解——但不要害怕,这很常见。

举个例子:

const pingEpic = action$ =>
  action$.ofType(PING)
    .delay(1000)
    .mapTo({ type: PONG })
    .takeUntil(action$.ofType(CANCEL));

这部史诗的行为可以描述为过滤掉所有不匹配类型PING的动作。当一个动作匹配时,等待 1000 毫秒,然后将该动作映射到另一个动作 { type: PONG },该动作将被发出,然后由 redux-observable 调度。如果在 任何 时间,当应用程序 运行 正在运行时,有人发送了 CANCEL 类型的操作,然后 取消订阅源,这意味着整个链将取消订阅,终止史诗。

如果您命令式地执行此操作,看看它的外观可能会有所帮助:

const pingEpic = action$ => {
  return new Rx.Observable(observer => {
    console.log('[pingEpic] subscribe');
    let timer;

    const subscription = action$.subscribe(action => {
      console.log('[pingEpic] received action: ' + action.type);

      // When anyone dispatches CANCEL, we stop listening entirely!
      if (action.type === CANCEL) {
        observer.complete();
        return;
      }

      if (action.type === PING) {
        timer = setTimeout(() => {
          const output = { type: PONG };
          observer.next(output);
        }, 1000);
      }
    });

    return {
      unsubscribe() {
        console.log('[pingEpic] unsubscribe');
        clearTimeout(timer);
        subscription.unsubscribe();
      }
    };
  });
};

您可以 运行 此代码与此处的假商店:http://jsbin.com/zeqasih/edit?js,console


相反,您通常要做的是将您希望取消的订阅者链与假定无限期收听的 top-level 链隔离开来。尽管您的示例(根据文档进行了修改)是人为设计的,但让我们先 运行 看一遍。

这里我们使用 mergeMap 运算符让我们执行匹配的操作并映射到 另一个单独的可观察链 .

演示:http://jsbin.com/nofato/edit?js,output

const pingEpic = action$ =>
  action$.ofType(PING)
    .mergeMap(() =>
      Observable.timer(1000)
        .takeUntil(action$.ofType(CANCEL))
        .mapTo({ type: PONG })
    );

我们使用 Observable.timer 等待 1000 毫秒,然后将它发出的值(碰巧是数字零,但这在这里并不重要)映射到我们的 PONG 操作。我们还说我们想从计时器源“获取”,直到它正常完成或我们收到类型为 CANCEL.

的操作

这隔离了链,因为 mergeMap 将继续订阅您 return 的可观察对象,直到它出错或完成。但是,当发生这种情况时,它 本身 并不会停止订阅您应用它的来源;本例中的 action$.ofType(PING)

更多 real-world 示例在 Cancellation section

的 redux-observable 文档中

Here we placed the .takeUntil() after inside our .mergeMap(), but after our AJAX call; this is important because we want to cancel only the AJAX request, not stop the Epic from listening for any future actions.

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(fetchUserFulfilled)
        .takeUntil(action$.ofType(FETCH_USER_CANCELLED))
    );

这一切听起来可能令人困惑,但就像最强大的东西一样,一旦你掌握了它,它就会变得直观。 Ben Lesh 出色地解释了 Observables 如何在 his recent talk 中工作,包括讨论操作符如何成为 Observables 链,甚至关于隔离订阅者链。尽管谈话是在 AngularConnect 上进行的,但它并不是 Angular 具体的。


顺便说一句,重要的是要注意您的 epics 不要吞咽或以其他方式阻止动作到达减速器,例如当您将传入操作映射到另一个不同的操作时。事实上,当你的史诗收到一个动作时,它 已经通过你的减速器 。将您的 epics 视为侦听应用程序操作流的边车进程,但无法阻止正常的 redux 事件发生,它只能发出新操作。