使用 Rxjs 的并发 Ajax 请求

Concurrent Ajax requests with Rxjs

我目前正在从 promises 切换到 observables。我正在为我的 React 应用程序使用 Redux-Observable。基本上,我正在寻找最好的运算符,它可以在所有可观察对象成功完成执行时启用多个并发 ajax 调用和 return 响应。 这是我的应用程序中的代码片段。

let epicPostAd = (action$, store, {ajax}) =>
  action$.ofType(POST_AD)
   .debounceTime(1000)
   .mergeMap(({ payload }) =>
     ajax(generateAjaxRequestSetting(POSTS_URL, 'post', payload,CT_JSON))
      .map(response => postAdSuccessful(response))
      .catch(e => Observable.of(postAdUnsuccessful(e.xhr.response)))
      .takeUntil(action$.ofType(LOCATION_CHANGE))
    )

这是一个简单的 ajax 请求,它发布给定的广告并在响应为 201 时调度 POST_AD_SUCCESSFUL,否则在出错时调度 POST_AD_UNSUCCESSFUL。 但问题是我想在有响应时创建后续的 ajax 可观察流。比如

.map(response => /* start a stream of ajax observables then process the response */)

如果您告诉我实现此目标的最佳方法,我将不胜感激。

听起来您正在寻找 forkJoin operator

它会订阅你传递给它的所有 Observables,在它们 all 完成后,它会发出数组中每个 Observables 的最后一个值。

不完全清楚您想在 Epic 中的哪个位置执行此操作,所以我只是举了一个通用示例:

const somethingEpic = (action$, store, { ajax }) =>
  action$.ofType(SOMETHING)
    .mergeMap(() =>
      Observable.forkJoin(
        ajax('/first'),
        ajax('/second'),
        ajax('/third')
      )
      .do(results => {
        // the results is an array, containing each
        const [first, second, third] = results;
        console.log(first, second, third);
      })
      .map(results => ({
        type: 'SOME_RESULTS',
        results
      }))
    );

从技术上讲,它支持最终的 resultSelector 参数,您可以使用它而不是在它后面使用 map 运算符,但我倾向于不使用它,因为我发现它不太清楚,只有在常见的 redux-observable 样式案例中,性能优势可以忽略不计。但知道它仍然很好。对于更多 "data-normalization" 的东西比 "transform this into an action" 的东西更方便。

const somethingEpic = (action$, store, { ajax }) =>
  action$.ofType(SOMETHING)
    .mergeMap(() =>
      Observable.forkJoin(
        ajax('/first'),
        ajax('/second'),
        ajax('/third'),
        results => ({
          type: 'SOME_RESULTS',
          results
        })
      )
    );

此外,如果您问自己 "what operator do I use?",您应该尝试文档中的运算符向导:http://reactivex.io/rxjs/

向下滚动到显示以下内容的部分:

Do you need to find an operator for your problem? Start by choosing an option from the list below:

  • I have one existing Observable, and...
  • I have some Observables to combine together as one Observable, and...
  • I have no Observables yet, and...

Hint: open your DevTools to experiment with RxJS.

虽然在这种情况下,forkJoin 是正确的建议,但当您单击它时,它还没有记录:sadface: 但是 google 搜索会显示许多不同的网站来解释它是什么做什么以及如何使用它(在 RxJS 和其他语言的其他 Rx 实现中)。喜欢this helpful website

这是我自己问题的答案。虽然 JayPhelps 回答了,但我意识到我的问题并没有那么清楚。使用 Jay 的推荐。我想出了以下内容:

let epicPostAd = (action$, store, {ajax, observable}) =>
  action$.ofType(POST_AD)
   .debounceTime(1000)
   .mergeMap(({ payload }) =>
     ajax(generateAjaxRequestSetting(POSTS_URL, 'post', payload, CT_JSON))
      .mergeMap(response =>
        observable.forkJoin(
          ajax(''),
          ajax('')
        )
      .do(res => {
        const [first, second, third] = results;
        console.log(first, second, third);
      })
      .map(res => postAdSuccessful(res))
    )
    .catch(e => observable.of(postAdUnsuccessful(e.xhr.response)))
    .takeUntil(action$.ofType(LOCATION_CHANGE))

)

这里是它的工作原理。我发出了一个 post 请求,并且在 ajax 请求完成执行后我立即 .mergeMap 使用 .forkJoin() 对 ajax 可观察数据流的响应。然后处理结果