rxjs - 如何在捕获并处理发出错误的错误后重试

rxjs - How to retry after catching and processing an error with emitting something

我正在使用 rxjs v5.4.3、redux-observable v0.16.0

在我的申请中,我想实现以下目标:

我想达到:

const fetchEpic = (action$: ActionsObservable<Action>, store: Store<IRootState>) => action$
  .ofAction(actions.fetchPost)
  .mergeMap(({ payload: { postId } })) => {
    const { authToken, refreshToken } = store.getState().auth;

    return api.fetchPost({ postId, authToken }) // this returns Observable<ResponseJSON>
      .map(res => actions.fetchSuccess({ res })) // if success, just emit success-action with the response
      .catch(err => {
        if (isAuthTokenExpiredError(err) {
          return api.reAuthenticate({ refreshToken })
            .map(res => actions.refreshTokenSuccess({ authToken: res.authToken });
            .catch(actions.fetchFailure({ err }))
            // and retry fetchPost after re-authenticate!
        }

        return Observable.of(actions.fetchFailure({ err }))
      })
  }

有什么解决办法吗?

有很多方法可以做到这一点,但我建议将重新身份验证拆分成自己的史诗,以便更容易 maintain/test/reuse。

这可能是这样的:

const reAuthenticateEpic = (action$, store) =>
  action$.ofType(actions.reAuthenticate)
    .switchMap(() => {
      const { refreshToken } = store.getState().auth;

      return api.reAuthenticate({ refreshToken })
        .map(res => actions.refreshTokenSuccess({ authToken: res.authToken }))
        .catch(err => Observable.of(
          actions.refreshTokenFailure({ err })
        ));
    });

我们还想使用 Observable.defer 这样的东西,这样每次重试时,我们都会查找 authToken 最新 版本:

Observable.defer(() => {
  const { authToken } = store.getState().auth;
    return api.fetchPost({ postId, authToken });
})

当我们在 fetchEpic 中捕获错误并检测到 isAuthTokenExpiredError 时,我们 return 一个 Observable 链:

  1. 开始侦听单个 refreshTokenSuccess,发出我们可以重试的信号
  2. 以防万一重新验证失败,我们用 .takeUntil(action$.ofType(refreshTokenFailure)) 监听它,这样我们就不会永远等待——您可能希望以不同的方式处理这种情况,您的电话。
  3. mergeMap 到原始源,这是 catch 回调的第二个参数。 "source" 是 catch 之前的 Observable 链,由于 Observable 是惰性的,当我们收到 refreshTokenSuccess 操作时,它将再次重新订阅该链,实际上是 "retrying"
  4. 将上面的链与一个 reAuthenticate 动作的 Observable 合并。这用于启动实际的重新授权。

总结一下:我们从 catch return 的 Observable 链将首先开始监听 refreshTokenSuccess,然后它发出 reAuthenticate,然后当(如果)我们接收 refreshTokenSuccess 然后我们将 "retry" 源,我们的 api.fetchPost() 链在我们包裹在 Observable.defercatch 之上。如果 refreshTokenFailure 在我们收到 refreshTokenSuccess 之前发出,我们将完全放弃。

const fetchEpic = (action$, store) =>
  action$.ofType(actions.fetchPost)
    .mergeMap(({ payload: { postId } })) =>
      Observable.defer(() => {
        const { authToken } = store.getState().auth;
        return api.fetchPost({ postId, authToken });
      })
        .map(res => actions.fetchSuccess({ res }))
        .catch((err, source) => {
          if (isAuthTokenExpiredError(err)) {
            // Start listening for refreshTokenSuccess, then kick off the reauth
            return action$.ofType(actions.refreshTokenSuccess)
              .takeUntil(action$.ofType(refreshTokenFailure))
              .take(1)
              .mergeMapTo(source) // same as .mergeMap(() => source)
              .merge(
                Observable.of(action.reAuthenticate())
              );
          } else {
            return Observable.of(actions.fetchFailure({ err }));
          }
        });
    );

这些示例未经测试,因此我可能会遇到一些小问题,但希望您能理解要点。可能还有一种更优雅的方法来执行此操作,但这至少会解除对您的阻止。 (如果其他人可以降低复杂性,我们非常欢迎他们编辑此答案)


旁注

  • 这会产生无限重试的可能性,这可能会在用户的浏览器或您的服务器中引起严重的问题。只重试一定次数可能是个好主意,and/or 在重试之间设置某种延迟。在实践中,这可能不值得担心,你最清楚。

  • 您(或稍后阅读本文的其他人)可能会想使用 .startWith(action.reAuthenticate()) 而不是合并,但请注意 startWith 只是 shorthand 对于 concat,而不是 merge,这意味着它会在我们开始侦听成功之前同步发出操作。通常这不是问题,因为 http 请求是异步的,但是 .