等待导致重复动作的状态变化发出
Wait for change in state causing repeated action emit
当我的史诗在 ofType
点响应时,我需要等到 state$.value.foo
变为 true
。一旦它为真,那么我希望它到达 from
,它会执行获取和重要的事情。我是这样做的:
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
iif(
() => state$.value.foo === true,
EMPTY,
action$.pipe(
filter(() => state$.value.foo === true),
)
),
from(fetch(...)).pipe(
// ... do important stuff here BUT only after state$.value.foo has become true
)
)
)
发生的事情是我发出了超大量的动作,但它永远不会到达 from(fetch))
。
首先,请注意 concat
运算符在 RxJS 6 中已弃用(那将是从 rxjs/operators
导入的)。但是,用于创建可观察对象的函数 concat
未 已弃用(即从 rxjs
导入的函数)。我建议使用未弃用的运算符。
其次,您当前的方法存在几个问题。
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
...
上面的过滤操作匹配类型 "START_CONTINUE_SESSION" 并允许它们传回 Redux store。这是因为 concat
运算符让源事件通过并等待前一个可观察对象完成 在 开始下一个可观察对象之前。但是,由于 redux-observable 操作流永远不会完成,concat
应该 永远不会开始下一个可观察对象!从旧的 RxJS 文档中查看以下弹珠图:
如图所示,源事件通过。使用 redux-obserable,这意味着您的 "START_CONTINUE_SESSION" 操作将陷入永无止境的重复循环。
即使动作流结束并且 concat
开始下一个可观察对象,也存在其他问题:
...
iif(
() => state$.value.foo === true,
EMPTY, // I've assumed that this is equivalent to `empty()`
action$.pipe(
filter(() => state$.value.foo === true),
),
),
...
您首先要检查商店中 foo
的当前值。如果它的值为 true
,则不再发出任何信息(仅在该特定步骤中),接下来将开始 from
。如果其值为 false
,则会创建一个新的动作流订阅。对于调度的每个未来动作,这会检查商店中 foo
的当前值。当它的值最终变为 true
时,流入的动作(可以是任何动作!)被 允许传回 Redux 存储 。但请注意,此订阅永远不会结束!同样,只要 foo
保持 true
.
,你就会有一个永无止境的动作循环进出 Redux 存储
您应该订阅 state$
,而不是订阅 action$
并检查状态。下面的示例有点不同,但我认为它展示了一种实现目标的方法。这会等待初始操作 (START_CONTINUE_SESSION
),然后等待状态变为 foo === true
,然后将操作和状态发送到 mergeMap
,您可以在其中正常处理它(获取,调度其他动作等)。如果您不需要状态副本,则可以忽略它。
export const epic = (action$, state$) =>
action$.pipe(
ofType(START_CONTINUE_SESSION),
withLatestFrom(state$),
exhaustMap(([action, state]) =>
state.foo === true
? of([action, state])
: state$.pipe(
mergeMap(state =>
state.foo === true
? of([action, state])
: empty()
),
first(),
)
),
mergeMap(([action, state]) =>
// here we have triggering action and state with foo === true
from(fetch(...)).pipe(
// ... do important stuff here
)
),
)
至于响应初始动作(START_CONTINUE_SESSION
),我在上面的例子中选择了exhaustMap
。可能的备选方案包括 concatMap
、mergeMap
和 switchMap
。您应该选择最适合您的用例的运算符:
concatMap
- 依次监听所有操作和 运行 多个工作流。
exhaustMap
- 收听第一个操作并等到完成工作流程后再接受另一个操作。
mergeMap
- 监听所有操作和 运行 多个并行工作流。
switchMap
- 监听所有动作,但一次只监听一个 运行。收到新操作时取消任何以前的工作流。
当我的史诗在 ofType
点响应时,我需要等到 state$.value.foo
变为 true
。一旦它为真,那么我希望它到达 from
,它会执行获取和重要的事情。我是这样做的:
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
iif(
() => state$.value.foo === true,
EMPTY,
action$.pipe(
filter(() => state$.value.foo === true),
)
),
from(fetch(...)).pipe(
// ... do important stuff here BUT only after state$.value.foo has become true
)
)
)
发生的事情是我发出了超大量的动作,但它永远不会到达 from(fetch))
。
首先,请注意 concat
运算符在 RxJS 6 中已弃用(那将是从 rxjs/operators
导入的)。但是,用于创建可观察对象的函数 concat
未 已弃用(即从 rxjs
导入的函数)。我建议使用未弃用的运算符。
其次,您当前的方法存在几个问题。
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
...
上面的过滤操作匹配类型 "START_CONTINUE_SESSION" 并允许它们传回 Redux store。这是因为 concat
运算符让源事件通过并等待前一个可观察对象完成 在 开始下一个可观察对象之前。但是,由于 redux-observable 操作流永远不会完成,concat
应该 永远不会开始下一个可观察对象!从旧的 RxJS 文档中查看以下弹珠图:
如图所示,源事件通过。使用 redux-obserable,这意味着您的 "START_CONTINUE_SESSION" 操作将陷入永无止境的重复循环。
即使动作流结束并且 concat
开始下一个可观察对象,也存在其他问题:
...
iif(
() => state$.value.foo === true,
EMPTY, // I've assumed that this is equivalent to `empty()`
action$.pipe(
filter(() => state$.value.foo === true),
),
),
...
您首先要检查商店中 foo
的当前值。如果它的值为 true
,则不再发出任何信息(仅在该特定步骤中),接下来将开始 from
。如果其值为 false
,则会创建一个新的动作流订阅。对于调度的每个未来动作,这会检查商店中 foo
的当前值。当它的值最终变为 true
时,流入的动作(可以是任何动作!)被 允许传回 Redux 存储 。但请注意,此订阅永远不会结束!同样,只要 foo
保持 true
.
您应该订阅 state$
,而不是订阅 action$
并检查状态。下面的示例有点不同,但我认为它展示了一种实现目标的方法。这会等待初始操作 (START_CONTINUE_SESSION
),然后等待状态变为 foo === true
,然后将操作和状态发送到 mergeMap
,您可以在其中正常处理它(获取,调度其他动作等)。如果您不需要状态副本,则可以忽略它。
export const epic = (action$, state$) =>
action$.pipe(
ofType(START_CONTINUE_SESSION),
withLatestFrom(state$),
exhaustMap(([action, state]) =>
state.foo === true
? of([action, state])
: state$.pipe(
mergeMap(state =>
state.foo === true
? of([action, state])
: empty()
),
first(),
)
),
mergeMap(([action, state]) =>
// here we have triggering action and state with foo === true
from(fetch(...)).pipe(
// ... do important stuff here
)
),
)
至于响应初始动作(START_CONTINUE_SESSION
),我在上面的例子中选择了exhaustMap
。可能的备选方案包括 concatMap
、mergeMap
和 switchMap
。您应该选择最适合您的用例的运算符:
concatMap
- 依次监听所有操作和 运行 多个工作流。exhaustMap
- 收听第一个操作并等到完成工作流程后再接受另一个操作。mergeMap
- 监听所有操作和 运行 多个并行工作流。switchMap
- 监听所有动作,但一次只监听一个 运行。收到新操作时取消任何以前的工作流。