RXJS repeat 没有机会重复?

RXJS repeat does not have a chance to repeat?

我在我的应用程序中使用以下史诗来处理 api 请求:

action$ => {
  return action$.ofType(actions.requestType)
    .do(() => console.log('handled epic ' + actions.requestType))
    .switchMap((action) => (
      Observable.create((obs) => {
        obs.next({ type: type, value: action.value, form: action.form });
      })
      .debounceTime(250)
      .switchMap((iea) => (
        Observable.ajax(ajaxPost(url(iea.value), body ? body(iea.value) : action.form))
          .mergeMap(payload => {
            return Observable.merge(
              Observable.of(actions.success(payload)),
              /* some other stuff */
            );
          })
          .catch(payload => {
            return [actions.failure(payload)];
          })
      ))
  ))
  .takeUntil(action$.filter((a) => (a.type === masterCancelAction))
  .repeat();
};

基本上,每当我执行 api 请求时,我都会发送一个请求操作。如果我快速发送另一个请求,则使用 debounceTime 忽略前一个请求。此外,可以使用 masterCancelAction 取消请求,并且在取消时 repeat() 重新启动史诗。这部史诗在所有情况下都按预期工作,除了一个。

失败案例发生在用户在请求期间使用浏览器返回时。在这种情况下,我将 masterCancelAction 触发到请求。然而,在与 masterCancelAction 的结果相同的执行上下文中,另一个请求操作调度以对同一史诗执行新请求,但是 api 请求不会发生(尽管 console.log 确实发生)好像没有 repeat()。在其他发生取消的情况下,不会从相同的执行上下文调用下一个请求并且它工作正常,所以在这种情况下我的代码似乎没有给 repeat 重新启动史诗的机会?

我发现的一个肮脏的解决方法是在取消后分派的请求上使用 setTimeout(dispatch(action), 0)。这似乎允许 repeat() 执行。我尝试将不同的调度程序传递给 repeat,但这似乎没有帮助。此外,将 takeUntil 和 repeat 附加到我的内部 switchMap 中可以解决问题,但是我的下一个请求不在同一调用堆栈中执行的其他情况会失败。

有没有不用setTimeout就能解决这个问题的方法?可能不是repeat相关的问题,但好像是这样。

使用 rxjs 5.0.3 和 redux-observable 0.14.1.

如果没有像 jsbin 这样的东西来理解你的意思,这个问题不是 100% 清楚,但我确实看到一些可能有帮助的一般性问题:

匿名 Observable 永远不会完成

创建自定义匿名 Observable 时,如果您确实希望它完成,请务必调用 observer.complete()。在大多数情况下,不这样做会导致订阅内存泄漏,还可能出现其他奇怪的行为

Observable.create((observer) => {
  observer.next({ type: type, value: action.value, form: action.form });
  observer.complete();
})

Observable.of 等价于:

Observable.of({ type: type, value: action.value, form: action.form })

但是,不清楚为什么要这样做,因为它发出的值在范围内被捕获。

debounceTime 在这种情况下不去抖动,它延迟

由于应用它的匿名 observable 只发出一个项目,debounceTime 将像常规 .delay(250) 一样工作。我打赌你打算改为去抖动 actions.requestType 动作,在这种情况下你需要在 switchMap 之外应用你的去抖动,在 action$.ofType(actions.requestType).

之后

Observable.of 接受任意数量的参数来发出

这更像是一个 "did you know?" 而不是问题,但我注意到你正在合并你的 of/* some other actions */ 我认为会是其他 of observables合并。相反,您可以只 return 一个 of 并将操作作为参数传递。

Observable.of(
  actions.success(payload),
  /* some other actions */
  actions.someOtherOne(),
  actions.etc()
);

此外,当您发现自己像这样同步发出多个动作时,请考虑您的 reducer 是否应该监听相同的单个动作,而不是有两个或更多。有时这没有意义,因为您希望它们具有完全不相关的操作,只是要记住人们经常忘记的事情——所有 reducer 都会收到所有操作,因此多个 reducer 可以从同一个操作更改它们的状态。

.takeUntil 将阻止史诗监听未来的动作

takeUntil 放在 top-level 可观察链上会导致史诗停止侦听 action$.ofType(actions.requestType),这就是您在后面添加 .repeat() 的原因。这在某些情况下可能有效,但效率低下并且可能导致其他难以发现的错误。 Epics 应该被视为有点像边车进程,通常 "start up" 与应用程序一起,然后继续侦听特定操作,直到应用程序 "shuts down" 即用户离开应用程序。它们实际上 不是进程,从概念上将它们视为一种抽象只是很有帮助。

因此,每次它匹配其特定操作时,它通常会 switchMapmergeMapconcatMapexhaustMap 产生一些副作用,例如 ajax打电话。 inner observable chain 是你想要取消的。所以你会把你的 .takeUntil 放在它上面,在链中的适当位置。


总结

如前所述,如果没有像 jsbin 这样的更完整的示例,则不清楚您打算做什么以及问题是什么。但严格根据提供的代码,这是我的猜测:

const someRequestEpic = action$ => {
  return action$.ofType(actions.requestType)
    .debounceTime(250)
    .do(() => console.log('handled epic ' + actions.requestType))
    .switchMap((action) =>
      Observable.ajax(ajaxPost(url(action.value), body ? body(action.value) : action.form))
        .takeUntil(action$.ofType(masterCancelAction))
        .mergeMap(payload => {
          return Observable.of(
            actions.success(payload),
            /* some other actions */
            ...etc
          );
        })
        .catch(payload => Observable.of(
          actions.failure(payload)
        ))
    );
};

查看 redux-observable 文档中的 Cancellation 页面。


如果这有点令人困惑,我建议更深入地了解 Observables 是什么以及 "operator" 是什么以及它的作用,这样就不会让人觉得它很神奇,也不会觉得你应该把操作符放在哪里更有意义。

Ben 在 Learning Observable by Building Observable 上的 post 是一个好的开始。