如何正确地对 RxJS 和 redux-observable 中的动作进行排序和反应?
How to properly sequence and react to actions in RxJS and redux-observable?
就 RxJS 而言,我仍在学习中。我在我的项目中使用 redux-observable
,我真的很喜欢它。但是,我觉得我没有掌握这里的一些基础知识,所以我想请求你的帮助。
示例代码如下:
const requestDelete = action$ => {
const actionSuccess$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
take(1),
mergeMap(_ => of(actions.deleteSuccess()))
)
const actionFailed$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
take(1),
mergeMap(_ => of(actions.deleteFailed()))
)
return action$.pipe(
ofType(types.REQUEST_DELETE),
mergeMap(action =>
from(SomeService.delete(action.payload)).pipe(
mergeMap(_ =>
merge(
actionSuccess$,
actionFailed$,
of(actions.requestAnActionThatMayFail())
)
),
catchError(err => of(actions.deleteFailed(err)))
)
),
catchError(err =>
of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
)
)
}
这是我心中的流程:
- 用户请求删除一些资源。
- 调用
SomeService
来执行此操作。
- 如果解决了,我想打电话给
requestAnActionThatMayFail
。现在,我想依赖于该操作的结果并等待它完成。
- 如果可以,我想
actions.deleteSuccess())
。
- 如果失败,我想用
actions.deleteFailed()
触发错误等。
现在,问题是 - 此代码 有效 ,但 merge
对我来说感觉很糟糕。我认为我应该使用 concat
并将 of(actions.requestAnActionThatMayFail())
作为第一个参数。如果我更改流的顺序或使用 concat
,整个史诗将无法运行。任何想法可能是什么问题?
If I change the order of streams
我怀疑它不会以这种方式工作,因为 requestAnActionThatMayFail
是一个 同步操作 。这解释了原因,因为 merge
将按顺序 订阅提供的可观察对象 ,这意味着如果你有
merge(
actionFailed$,
of(actions.requestAnActionThatMayFail()),
actionSuccess$,
)
和requestAnActionThatMayFail
成功了,因为actionSuccess$
还没有被订阅(actionFailed$
被订阅),所以不会生效。如果 requestAnActionThatMayFail
是 异步的 ,你应该不会遇到这个问题。
or use concat
concat
是 merge
,concurrent
设置为 1。虽然 merge
可以有多个活动的内部可观察对象,但 concat
只能有一个。当设置 concurrent
时,merge
将缓冲超出的值,当一个内部可观察值 完成 时,它将采用最旧的缓冲值并创建一个内部可观察值
解释与上一节相同:其他可观察对象尚未订阅。
所以,这是我的方法:
const requestDelete = action$ => {
const actionSuccess$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
take(1),
mergeMap(_ => of(actions.deleteSuccess()))
)
const actionFailed$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
take(1),
mergeMap(_ => of(actions.deleteFailed()))
)
const requestDelete$ = action$.pipe(
ofType(types.REQUEST_DELETE),
mergeMap(action =>
from(SomeService.delete(action.payload)),
mergeMap(_ => of(actions.requestAnActionThatMayFail())),
catchError(err => of(actions.deleteFailed(err)))
),
// This seems a little redundant because of the previous `catchError`
// catchError(err =>
// of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
// )
)
// Make sure all the obs. are susbcribed
return merge(actionSuccess$, actionFailed$, requestDelete$);
}
就 RxJS 而言,我仍在学习中。我在我的项目中使用 redux-observable
,我真的很喜欢它。但是,我觉得我没有掌握这里的一些基础知识,所以我想请求你的帮助。
示例代码如下:
const requestDelete = action$ => {
const actionSuccess$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
take(1),
mergeMap(_ => of(actions.deleteSuccess()))
)
const actionFailed$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
take(1),
mergeMap(_ => of(actions.deleteFailed()))
)
return action$.pipe(
ofType(types.REQUEST_DELETE),
mergeMap(action =>
from(SomeService.delete(action.payload)).pipe(
mergeMap(_ =>
merge(
actionSuccess$,
actionFailed$,
of(actions.requestAnActionThatMayFail())
)
),
catchError(err => of(actions.deleteFailed(err)))
)
),
catchError(err =>
of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
)
)
}
这是我心中的流程:
- 用户请求删除一些资源。
- 调用
SomeService
来执行此操作。 - 如果解决了,我想打电话给
requestAnActionThatMayFail
。现在,我想依赖于该操作的结果并等待它完成。 - 如果可以,我想
actions.deleteSuccess())
。 - 如果失败,我想用
actions.deleteFailed()
触发错误等。
现在,问题是 - 此代码 有效 ,但 merge
对我来说感觉很糟糕。我认为我应该使用 concat
并将 of(actions.requestAnActionThatMayFail())
作为第一个参数。如果我更改流的顺序或使用 concat
,整个史诗将无法运行。任何想法可能是什么问题?
If I change the order of streams
我怀疑它不会以这种方式工作,因为 requestAnActionThatMayFail
是一个 同步操作 。这解释了原因,因为 merge
将按顺序 订阅提供的可观察对象 ,这意味着如果你有
merge(
actionFailed$,
of(actions.requestAnActionThatMayFail()),
actionSuccess$,
)
和requestAnActionThatMayFail
成功了,因为actionSuccess$
还没有被订阅(actionFailed$
被订阅),所以不会生效。如果 requestAnActionThatMayFail
是 异步的 ,你应该不会遇到这个问题。
or use concat
concat
是 merge
,concurrent
设置为 1。虽然 merge
可以有多个活动的内部可观察对象,但 concat
只能有一个。当设置 concurrent
时,merge
将缓冲超出的值,当一个内部可观察值 完成 时,它将采用最旧的缓冲值并创建一个内部可观察值
解释与上一节相同:其他可观察对象尚未订阅。
所以,这是我的方法:
const requestDelete = action$ => {
const actionSuccess$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_SUCCESS),
take(1),
mergeMap(_ => of(actions.deleteSuccess()))
)
const actionFailed$ = action$.pipe(
ofType(types.AN_ACTION_THAT_MAY_FAIL_FAILED),
take(1),
mergeMap(_ => of(actions.deleteFailed()))
)
const requestDelete$ = action$.pipe(
ofType(types.REQUEST_DELETE),
mergeMap(action =>
from(SomeService.delete(action.payload)),
mergeMap(_ => of(actions.requestAnActionThatMayFail())),
catchError(err => of(actions.deleteFailed(err)))
),
// This seems a little redundant because of the previous `catchError`
// catchError(err =>
// of({ type: 'CRITICAL_REQUEST_DELETE_ERROR', err })
// )
)
// Make sure all the obs. are susbcribed
return merge(actionSuccess$, actionFailed$, requestDelete$);
}