React Actions 使用 RxJS 进行并行异步调用
React Actions using RxJS for parallel async calls
我有一个现有的操作调用 api 并将 ID 列表作为查询字符串参数,并获取传递的 ID 的缩略图响应。在某些情况下,ID 的数量可以是 150-200 条记录。因此,我正在努力使用 RxJs 的 forkJoin
和 subscribe
方法使这个 API 调用基于批处理和 运行 并行。我在实施方面面临以下 3 个问题。
我可以在下一个订阅方法中获得基于批处理的响应。但是,此函数中调用的操作以及响应未被调用。
maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次执行的,而不是按常量/
分批执行的 3 个
如何处理 return observable 以便 mergeMap 不给出语法错误。 Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)
这是我当前的代码。任何帮助都会很好。
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
}
forkJoin(fetchThumbnailObservables)
.pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 : maxParallelQueries limit is not working
.subscribe({
complete: () => { },
error: () => { },
next: (response) => {
// Issue 2 : below action dont seems to be getting invoked
actions.AssetActions.receivedThumbnailsAction(response, meta)
},
});
// Issue 3 : How to handle return of observable
// Added below code for prevennting error raised by mergeMap( ({ payload }) => {
return new Observable<any>();
}),
catchError((error) => of(actions.Common.errorOccured({ error })))
);
```
我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。
让我们从您有一个 ID 数组 assetIds
的事实开始,并且您希望为每个 ID 进行一个 http 调用,并发度受控 maxParallelQueries
。
在这种情况下,from
函数和 mergeMap
运算符是你的朋友。
你需要做的是
// from function transform an array of values, in this case, to a stream of values
from(assetIds).pipe(
mergeMap(id => {
// do whatever you need to do and eventually return an Observable
return AssetDataService.fetchThumbnailData(id)
}, maxParallelQueries) // the second parameter of mergeMap is the concurrency
)
通过这种方式,您应该能够调用 http 调用,保持已建立的并发限制
- I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.
您不得在 redux-observable 中强制调用操作。相反,您需要将它们排队。
actions.AssetActions.receivedThumbnailsAction(response, meta)
将 return 一个简单的 object 形状 {type: "MY_ACTION", payload: ...}
而不是调度一个动作。您宁愿 return object 包裹在 mergeMap
内的可观察对象中,以便将下一个操作添加到队列中。 (见下面的解决方案)
- maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/
初步猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize))
的 return 类型是什么?如果它是 Promise
类型,那么 maxParallelQueries
没有效果,因为 promises 是急切的并且在 mergeMap
有机会控制请求之前创建时立即开始。因此,请确保 fetchThumbnailData
return 是可观察的。如果您需要将 Promise 转换为 Observable,请使用 defer
而不是 from
以确保不会在 fetchThumbnailData
.
中过早触发 promise
第二个猜测:forkJoin
正在触发所有请求,因此 mergeMap
甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告你关于 mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, ...)
中不正确的 return 类型。我下面的解决方案将 forJoin
替换为 from
并且应该启用您的 maxParallelQueries
.
- How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.ts(2345)
这不是语法错误。在史诗中,您可以选择 return 一个空的可观察对象 return empty()
如果没有后续动作,或者通过 return 一个 IAction
类型的可观察对象来调度另一个动作。在这里,我(未经测试)尝试满足您的要求:
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(
AssetDataService.fetchThumbnailData(
assetIds.slice(count, count + batchSize)
)
);
}
return from(fetchThumbnailObservables)
.pipe(
mergeMap(
fetchThumbnailObservable => fetchThumbnailObservable,
maxParallelQueries
),
map((response) =>
// dispatch action for each response
actions.AssetActions.receivedThumbnailsAction(response, meta)
),
// catch error early so epic continue on error
catchError((error) => of(actions.Common.errorOccured({ error })))
);
})
);
请注意,我将 catchError
移动到离请求更近的位置(在内部 forkJoin Observable 内),这样史诗不会在发生错误时终止。 Error Handling - redux observable 之所以如此,是因为当 catchError
处于活动状态时,它会用 catchError
编辑的 return 替换失败的可观察对象。而且我假设您不想在出现错误时替换整个史诗?
有一件事:redux-observable 为您处理订阅,因此不需要在您的应用程序中的任何地方调用 .subscribe
或 .unsubscribe
。您只需要关心可观察对象的生命周期。此处最重要的是要了解可观察对象何时开始、完成时会发生什么、何时完成或可观察对象抛出时会发生什么。
我有一个现有的操作调用 api 并将 ID 列表作为查询字符串参数,并获取传递的 ID 的缩略图响应。在某些情况下,ID 的数量可以是 150-200 条记录。因此,我正在努力使用 RxJs 的 forkJoin
和 subscribe
方法使这个 API 调用基于批处理和 运行 并行。我在实施方面面临以下 3 个问题。
我可以在下一个订阅方法中获得基于批处理的响应。但是,此函数中调用的操作以及响应未被调用。
maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次执行的,而不是按常量/
分批执行的 3 个如何处理 return observable 以便 mergeMap 不给出语法错误。
Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)
这是我当前的代码。任何帮助都会很好。
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
}
forkJoin(fetchThumbnailObservables)
.pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 : maxParallelQueries limit is not working
.subscribe({
complete: () => { },
error: () => { },
next: (response) => {
// Issue 2 : below action dont seems to be getting invoked
actions.AssetActions.receivedThumbnailsAction(response, meta)
},
});
// Issue 3 : How to handle return of observable
// Added below code for prevennting error raised by mergeMap( ({ payload }) => {
return new Observable<any>();
}),
catchError((error) => of(actions.Common.errorOccured({ error })))
);
```
我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。
让我们从您有一个 ID 数组 assetIds
的事实开始,并且您希望为每个 ID 进行一个 http 调用,并发度受控 maxParallelQueries
。
在这种情况下,from
函数和 mergeMap
运算符是你的朋友。
你需要做的是
// from function transform an array of values, in this case, to a stream of values
from(assetIds).pipe(
mergeMap(id => {
// do whatever you need to do and eventually return an Observable
return AssetDataService.fetchThumbnailData(id)
}, maxParallelQueries) // the second parameter of mergeMap is the concurrency
)
通过这种方式,您应该能够调用 http 调用,保持已建立的并发限制
- I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.
您不得在 redux-observable 中强制调用操作。相反,您需要将它们排队。
actions.AssetActions.receivedThumbnailsAction(response, meta)
将 return 一个简单的 object 形状 {type: "MY_ACTION", payload: ...}
而不是调度一个动作。您宁愿 return object 包裹在 mergeMap
内的可观察对象中,以便将下一个操作添加到队列中。 (见下面的解决方案)
- maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/
初步猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize))
的 return 类型是什么?如果它是 Promise
类型,那么 maxParallelQueries
没有效果,因为 promises 是急切的并且在 mergeMap
有机会控制请求之前创建时立即开始。因此,请确保 fetchThumbnailData
return 是可观察的。如果您需要将 Promise 转换为 Observable,请使用 defer
而不是 from
以确保不会在 fetchThumbnailData
.
第二个猜测:forkJoin
正在触发所有请求,因此 mergeMap
甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告你关于 mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, ...)
中不正确的 return 类型。我下面的解决方案将 forJoin
替换为 from
并且应该启用您的 maxParallelQueries
.
- How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.ts(2345)
这不是语法错误。在史诗中,您可以选择 return 一个空的可观察对象 return empty()
如果没有后续动作,或者通过 return 一个 IAction
类型的可观察对象来调度另一个动作。在这里,我(未经测试)尝试满足您的要求:
const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(
AssetDataService.fetchThumbnailData(
assetIds.slice(count, count + batchSize)
)
);
}
return from(fetchThumbnailObservables)
.pipe(
mergeMap(
fetchThumbnailObservable => fetchThumbnailObservable,
maxParallelQueries
),
map((response) =>
// dispatch action for each response
actions.AssetActions.receivedThumbnailsAction(response, meta)
),
// catch error early so epic continue on error
catchError((error) => of(actions.Common.errorOccured({ error })))
);
})
);
请注意,我将 catchError
移动到离请求更近的位置(在内部 forkJoin Observable 内),这样史诗不会在发生错误时终止。 Error Handling - redux observable 之所以如此,是因为当 catchError
处于活动状态时,它会用 catchError
编辑的 return 替换失败的可观察对象。而且我假设您不想在出现错误时替换整个史诗?
有一件事:redux-observable 为您处理订阅,因此不需要在您的应用程序中的任何地方调用 .subscribe
或 .unsubscribe
。您只需要关心可观察对象的生命周期。此处最重要的是要了解可观察对象何时开始、完成时会发生什么、何时完成或可观察对象抛出时会发生什么。