在 RXJS 可观察对象中管理承诺

Managing promises in RXJS observables

我研究了 SO 并发现了许多类似的 questions/answers 但我可能在我对如何使用此堆栈的基本理解中遗漏了一些东西。

我正在与 RXJS/obervables 一起开发一个 React Native 项目。在某些时候我做文件下载,这部分不是问题。预先存在的 axios-rxjsreact-native-file-system 的组合让我到达我想要的地方。问题是我不确定如何在没有 async/await 的情况下干净地处理它,我知道这是一种反模式。

我想将此工作代码转换为干净的可观察式流程。

我已经实现了一个 Epic 来执行我想要的操作:

const myDownloadEpic = (
  action$,
  state$
) =>
  action$.pipe(
    ofType(myDownloadActionType), // catches the relevant action
    map(action => action.payload),
    mergeMap(payload =>
      downloadManager       // a class with the relevant utils to get files
        .download(payload), // this axios call returns my Blob file as Observable<Blob> 
        .pipe(
          mergeMap(async response => {

            // a promise is returned by RNFS to read the contents of a folder
            const files = await RNFS.readDir(RELEVANT_TARGET_PATH) 
...
            // a promise returned from a custom function that converts my blob to bas64
            const data = await convertToBase64(response) 

            // another promise returned by RNFS to write to the file system
            await RNFS.writeFile(FULL_PATH, data, 'base64');

...
          })
        )
    )
   )

我试过将其拆分为多个管道,例如,我尝试将 READ 操作拆分为前一个管道,但最终看起来非常冗长。有没有一种干净简单的方法来“持有”直到承诺完成,以便我可以根据他们的结果做出决定?

在这种情况下,什么会被认为更清洁?

from() 运算符可以将 promise 转换为 observable,后者将发出承诺的值然后完成。

如果您需要等到所有 promises 都已解决,我建议 forkJoin() 因为在所有 observable 完成之前它不会发出值。

最后,为了让代码更简洁一些,我还建议声明单独的 variables/functions 来为每个承诺定义您的可观察对象。

const files$ = from(RNFS.readDir(RELEVANT_TARGET_PATH));
const getData = (response: unknown) => from(convertToBase64(response)).pipe(
  mergeMap(data=>
    from(RNFS.writeFile(FULL_PATH, data, 'base64')).pipe(
      mapTo(data)
    )
  )
);

const myDownloadEpic = (action$, state$) =>
  action$.pipe(
    ofType(myDownloadActionType),
    map(({ payload }) => payload),
    mergeMap(payload => downloadManager.download(payload)),
    mergeMap(response =>
      forkJoin({
        files: files$,
        data: getData(response)
      })
    ),
    map(({ files, data }) => {
      // check something with `files` and `data`
    })
  );

我假设 RNFS.writeFile() 响应是 void,所以我在订阅 getData() 时将其作为效果。 mapTo() 运算符会忽略来自源可观察对象的任何发射值,以及 returns 您在参数中输入的任何值。

你可以尝试这样的事情。它应该大致相当于你上面写的。

const myDownloadEpic = (
  action$,
  state$
) => action$.pipe(
  ofType(myDownloadActionType),
  map(action => action.payload),

  mergeMap(payload => downloadManager.download(payload)),

  mergeMap(response => from(RNFS.readDir(RELEVANT_TARGET_PATH)).pipe(
    map(files => ({response, files}))
  )),

  mergeMap(values => from(convertToBase64(values.response)).pipe(
    map(data => ({...values, data}))
  )),

  mergeMap(({response, files, data}) => RNFS.writeFile(FULL_PATH, data, 'base64'))
);