如何正确地对 RxJS 和 redux-observable 中的动作进行排序和反应?

How to properly sequence and react to actions in RxJS and redux-observable?

就 RxJS 而言,我仍在学习中。我在我的项目中使用 redux-observable,我真的很喜欢它。但是,我觉得我没有掌握这里的一些基础知识,所以我想请求你的帮助。

示例代码如下:

const requestDelete = action$ => {
  const actionSuccess$ = action$.pipe(
    ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
    take(1),
    mergeMap(_ => of(actions.deleteSuccess()))
  )

  const actionFailed$ = action$.pipe(
    ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
    take(1),
    mergeMap(_ => of(actions.deleteFailed()))
  )

  return action$.pipe(
    ofType(types.REQUEST_DELETE),
    mergeMap(action =>
      from(SomeService.delete(action.payload)).pipe(
        mergeMap(_ =>
          merge(
            actionSuccess$,
            actionFailed$,
            of(actions.requestAnActionThatMayFail())
          )
        ),
        catchError(err => of(actions.deleteFailed(err)))
      )
    ),
    catchError(err =>
      of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
    )
  )
}

这是我心中的流程:

  1. 用户请求删除一些资源。
  2. 调用 SomeService 来执行此操作。
  3. 如果解决了,我想打电话给requestAnActionThatMayFail。现在,我想依赖于该操作的结果并等待它完成。
  4. 如果可以,我想actions.deleteSuccess())
  5. 如果失败,我想用actions.deleteFailed()触发错误等。

现在,问题是 - 此代码 有效 ,但 merge 对我来说感觉很糟糕。我认为我应该使用 concat 并将 of(actions.requestAnActionThatMayFail()) 作为第一个参数。如果我更改流的顺序或使用 concat,整个史诗将无法运行。任何想法可能是什么问题?

If I change the order of streams

我怀疑它不会以这种方式工作,因为 requestAnActionThatMayFail 是一个 同步操作 。这解释了原因,因为 merge 将按顺序 订阅提供的可观察对象 ,这意味着如果你有

merge(
 actionFailed$,
 of(actions.requestAnActionThatMayFail()),
 actionSuccess$,
)

requestAnActionThatMayFail成功了,因为actionSuccess$还没有被订阅(actionFailed$被订阅),所以不会生效。如果 requestAnActionThatMayFail 异步的 ,你应该不会遇到这个问题。

or use concat

concatmergeconcurrent 设置为 1。虽然 merge 可以有多个活动的内部可观察对象,但 concat 只能有一个。当设置 concurrent 时,merge 将缓冲超出的值,当一个内部可观察值 完成 时,它将采用最旧的缓冲值并创建一个内部可观察值
解释与上一节相同:其他可观察对象尚未订阅。

所以,这是我的方法:

const requestDelete = action$ => {
  const actionSuccess$ = action$.pipe(
    ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
    take(1),
    mergeMap(_ => of(actions.deleteSuccess()))
  )

  const actionFailed$ = action$.pipe(
    ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
    take(1),
    mergeMap(_ => of(actions.deleteFailed()))
  )

  const requestDelete$ = action$.pipe(
    ofType(types.REQUEST_DELETE),
    mergeMap(action =>
      from(SomeService.delete(action.payload)),
      mergeMap(_ => of(actions.requestAnActionThatMayFail())),
      catchError(err => of(actions.deleteFailed(err)))
    ),

    // This seems a little redundant because of the previous `catchError`
    // catchError(err =>
    //   of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
    // )
  )

  // Make sure all the obs. are susbcribed
  return merge(actionSuccess$, actionFailed$, requestDelete$);
}