等待导致重复动作的状态变化发出

Wait for change in state causing repeated action emit

当我的史诗在 ofType 点响应时,我需要等到 state$.value.foo 变为 true。一旦它为真,那么我希望它到达 from ,它会执行获取和重要的事情。我是这样做的:

action$.pipe(
    ofType(START_CONTINUE_SESSION),
    concat(
        iif(
            () => state$.value.foo === true,
            EMPTY,
            action$.pipe(
                filter(() => state$.value.foo === true),
            )
        ),
        from(fetch(...)).pipe(
            // ... do important stuff here BUT only after state$.value.foo has become true
        )
    )
)

发生的事情是我发出了超大量的动作,但它永远不会到达 from(fetch))

首先,请注意 concat 运算符在 RxJS 6 中已弃用(那将是从 rxjs/operators 导入的)。但是,用于创建可观察对象的函数 concat 已弃用(即从 rxjs 导入的函数)。我建议使用未弃用的运算符。

其次,您当前的方法存在几个问题。

action$.pipe(
  ofType(START_CONTINUE_SESSION),
  concat(
    ...

上面的过滤操作匹配类型 "START_CONTINUE_SESSION" 并允许它们传回 Redux store。这是因为 concat 运算符让源事件通过并等待前一个可观察对象完成 开始下一个可观察对象之前。但是,由于 redux-observable 操作流永远不会完成,concat 应该 永远不会开始下一个可观察对象!从旧的 RxJS 文档中查看以下弹珠图:

如图所示,源事件通过。使用 redux-obserable,这意味着您的 "START_CONTINUE_SESSION" 操作将陷入永无止境的重复循环。

即使动作流结束并且 concat 开始下一个可观察对象,也存在其他问题:

  ...
  iif(
    () => state$.value.foo === true,
    EMPTY, // I've assumed that this is equivalent to `empty()`
    action$.pipe(
      filter(() => state$.value.foo === true),
    ),
  ),
  ...

您首先要检查商店中 foo 的当前值。如果它的值为 true,则不再发出任何信息(仅在该特定步骤中),接下来将开始 from。如果其值为 false,则会创建一个新的动作流订阅。对于调度的每个未来动作,这会检查商店中 foo 的当前值。当它的值最终变为 true 时,流入的动作(可以是任何动作!)被 允许传回 Redux 存储 。但请注意,此订阅永远不会结束!同样,只要 foo 保持 true.

,你就会有一个永无止境的动作循环进出 Redux 存储

您应该订阅 state$,而不是订阅 action$ 并检查状态。下面的示例有点不同,但我认为它展示了一种实现目标的方法。这会等待初始操作 (START_CONTINUE_SESSION),然后等待状态变为 foo === true,然后将操作和状态发送到 mergeMap,您可以在其中正常处理它(获取,调度其他动作等)。如果您不需要状态副本,则可以忽略它。

export const epic = (action$, state$) =>
  action$.pipe(
    ofType(START_CONTINUE_SESSION),
    withLatestFrom(state$),
    exhaustMap(([action, state]) =>
      state.foo === true
        ? of([action, state])
        : state$.pipe(
            mergeMap(state =>
              state.foo === true
                ? of([action, state])
                : empty()
            ),
            first(),
          )
    ),
    mergeMap(([action, state]) =>
      // here we have triggering action and state with foo === true
      from(fetch(...)).pipe(
        // ... do important stuff here
      )
    ),
  )

至于响应初始动作(START_CONTINUE_SESSION),我在上面的例子中选择了exhaustMap。可能的备选方案包括 concatMapmergeMapswitchMap。您应该选择最适合您的用例的运算符:

  • concatMap - 依次监听所有操作和 运行 多个工作流。
  • exhaustMap - 收听第一个操作并等到完成工作流程后再接受另一个操作。
  • mergeMap - 监听所有操作和 运行 多个并行工作流。
  • switchMap - 监听所有动作,但一次只监听一个 运行。收到新操作时取消任何以前的工作流。