使用 RxJS 进行批处理?

Batching using RxJS?

我猜这应该比较容易实现,但我在(我猜是概念上的)弄清楚如何解决它时遇到了麻烦。

我有一个 API returns 一个 JSON 对象数组。我需要遍历这些对象,并且对每个对象进行另一个 AJAX 调用。问题是处理每个 AJAX 调用的系统一次只能处理两个活动调用(因为它是一项相当 CPU 密集型任务,需要连接到桌面应用程序)。

我想知道如何使用 RxJS(使用版本 5 或 4)实现这一点?

编辑:此外,是否可以同时执行一系列步骤 运行ning。即

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

我试过做类似的事情:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );

但这似乎不起作用。例如,downloadFile 不会等待 processFile、convertFile 和 uploadFile 全部完成,而是下一个将在前一个完成后立即再次 运行。

这样的事情怎么样?您可以使用 from 将数组分成一口大小的块,然后使用 concatMap 将它们一一处理。

function getArr() {
    return Rx.Observable.of([1, 2, 3, 4, 5, 6, 7, 8]);
}


function processElement(element) {
    return Rx.Observable.of(element)
        .delay(500);
}


getArr()
    .concatMap(arr => {
        return Rx.Observable.from(arr);
    })
    .concatMap(element => {
        return processElement(element);
    })
    .subscribe(res => {
        console.log(res);
    });

您可以将 merge 运算符与 maxConcurrency 重载(Rxjs v4)一起使用,例如:

Rx.Observable.fromArray(aJSONs)
  .map (function (JSONObject) {
    return ajaxRequest(JSONObject) // that would be an observable (or a promise)
  })
  .merge(2)

您可以在以下网址查看其他使用示例:

  • ,
  • How to limit the concurrency of flatMap?

官方文档:

这里有 2 种方法,如果你想要完全像这样的请求顺序

Downloading File: 1
Processing File: 1
Converting File: 1
Uploading File: 1
Downloading File: 2
Processing File: 2
...

你需要在单个 concatMap 方法中解决所有的承诺,就像这样

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .concatMap(function(item) {
    return downloadFile(item)
      .then(processFile)
      .then(convertFile);
  })
  .subscribe(function(data) {
    console.log(data);
  });

在此处查看工作插件:https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview 这样,新的 ajax 调用只会在前一个调用完成后发送。

另一种方法是允许文件并行发送请求,但操作 'downloading,processing,converting,uploading' 将按顺序进行。为此,您可以通过

让它工作
Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .merge(2)  // in case maximum concurrency required is 2
  .concatMap(function(item) {
    return downloadFile(item);
  })
  .concatMap(function(item) {
    return processFile(item);
  })
  .concatMap(function(item) {
    return convertFile(item)
  })
  .subscribe(function(data) {
    //console.log(data);
  });

请在此处查看 plunkr:https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

旧 post 但我相信这可行,对于控制台日志,我们可以使用 tap。笔记编辑器会通过智能感知错误,因为 from 需要一个数组,但代码应该可以工作。

 from(start()).pipe(
    switchMap(files => from(files).pipe(
       switchMap(file => from(downloadFile(file)).pipe(
         map(_ => ({file: file, downloaded: true}))
       )),
       switchMap(attr => from(processFile(attr.file)).pipe(
         map(_ => ({...attr, downloaded: true}))
       )),
       switchMap(attr => from(convertFile(attr.file)).pipe(
         map(_ => ({...attr, converted: true}))
       )),
       switchMap(attr => from(uploadFile(attr.file)).pipe(
         map(_ => ({...attr, uploaded: true}))
       ))
    ))
 ).subscribe(_ => {})