React Actions 使用 RxJS 进行并行异步调用

React Actions using RxJS for parallel async calls

我有一个现有的操作调用 api 并将 ID 列表作为查询字符串参数,并获取传递的 ID 的缩略图响应。在某些情况下,ID 的数量可以是 150-200 条记录。因此,我正在努力使用 RxJs 的 forkJoinsubscribe 方法使这个 API 调用基于批处理和 运行 并行。我在实施方面面临以下 3 个问题。

  1. 我可以在下一个订阅方法中获得基于批处理的响应。但是,此函数中调用的操作以及响应未被调用。

  2. maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次执行的,而不是按常量/

    分批执行的 3 个
  3. 如何处理 return observable 以便 mergeMap 不给出语法错误。 Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)

这是我当前的代码。任何帮助都会很好。

const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
    action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
            
            const assetIds = Object.keys(payload);
            const meta = payload;

            const batchSize = 10;
            const maxParallelQueries = 3;
            
            const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

            for (let count = 0; count < assetIds.length; count += batchSize) {
                fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
            }

             forkJoin(fetchThumbnailObservables)
            .pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 :  maxParallelQueries limit is not working
            .subscribe({ 
                complete: () => { }, 
                error: () => { },
                next: (response) => { 
                     // Issue 2 :  below action dont seems to be getting invoked
                    actions.AssetActions.receivedThumbnailsAction(response, meta) 
                },     
             });
    
            // Issue 3 :  How to handle return of observable 
            // Added below code for prevennting error raised by mergeMap( ({ payload }) => {
            return new Observable<any>();
        }),
        catchError((error) => of(actions.Common.errorOccured({ error })))
    );
    ```

我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。

让我们从您有一个 ID 数组 assetIds 的事实开始,并且您希望为每个 ID 进行一个 http 调用,并发度受控 maxParallelQueries

在这种情况下,from 函数和 mergeMap 运算符是你的朋友。

你需要做的是

// from function transform an array of values, in this case, to a stream of values
from(assetIds).pipe(
  mergeMap(id => {
    // do whatever you need to do and eventually return an Observable
    return AssetDataService.fetchThumbnailData(id)
  }, maxParallelQueries) // the second parameter of mergeMap is the concurrency
)

通过这种方式,您应该能够调用 http 调用,保持已建立的并发限制

  1. I am able to get the batch based response in next method of subscribe. However, the action invoked in this function along with response is not getting invoked.

您不得在 redux-observable 中强制调用操作。相反,您需要将它们排队。 actions.AssetActions.receivedThumbnailsAction(response, meta) 将 return 一个简单的 object 形状 {type: "MY_ACTION", payload: ...} 而不是调度一个动作。您宁愿 return object 包裹在 mergeMap 内的可观察对象中,以便将下一个操作添加到队列中。 (见下面的解决方案)

  1. maxParallelQueries dont seems to be working as intended. All the fetchThumbnailObservables() are executed at once and not in the batches of 3 as set as constant/

初步猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)) 的 return 类型是什么?如果它是 Promise 类型,那么 maxParallelQueries 没有效果,因为 promises 是急切的并且在 mergeMap 有机会控制请求之前创建时立即开始。因此,请确保 fetchThumbnailData return 是可观察的。如果您需要将 Promise 转换为 Observable,请使用 defer 而不是 from 以确保不会在 fetchThumbnailData.

中过早触发 promise

第二个猜测:forkJoin 正在触发所有请求,因此 mergeMap 甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告你关于 mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, ...) 中不正确的 return 类型。我下面的解决方案将 forJoin 替换为 from 并且应该启用您的 maxParallelQueries.

  1. How to handle return of observable so that mergeMap do not give syntax error . Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.ts(2345)

这不是语法错误。在史诗中,您可以选择 return 一个空的可观察对象 return empty() 如果没有后续动作,或者通过 return 一个 IAction 类型的可观察对象来调度另一个动作。在这里,我(未经测试)尝试满足您的要求:

const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
  action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
    mergeMap( ({ payload }) => {
            
      const assetIds = Object.keys(payload);
      const meta = payload;

      const batchSize = 10;
      const maxParallelQueries = 3;
            
      const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

      for (let count = 0; count < assetIds.length; count += batchSize) {
        fetchThumbnailObservables.push(
          AssetDataService.fetchThumbnailData(
             assetIds.slice(count, count + batchSize)
          )
        );
      }

      return from(fetchThumbnailObservables)
        .pipe(
          mergeMap(
            fetchThumbnailObservable => fetchThumbnailObservable,
            maxParallelQueries
          ),
          map((response) =>
            // dispatch action for each response
            actions.AssetActions.receivedThumbnailsAction(response, meta)
          ),
          // catch error early so epic continue on error
          catchError((error) => of(actions.Common.errorOccured({ error })))
        );
    })
  );

请注意,我将 catchError 移动到离请求更近的位置(在内部 forkJoin Observable 内),这样史诗不会在发生错误时终止。 Error Handling - redux observable 之所以如此,是因为当 catchError 处于活动状态时,它会用 catchError 编辑的 return 替换失败的可观察对象。而且我假设您不想在出现错误时替换整个史诗?

有一件事:redux-observable 为您处理订阅,因此不需要在您的应用程序中的任何地方调用 .subscribe.unsubscribe。您只需要关心可观察对象的生命周期。此处最重要的是要了解可观察对象何时开始、完成时会发生什么、何时完成或可观察对象抛出时会发生什么。