使用 Rxjs 并行调用同时延迟的请求块?

Using Rxjs to invoke simultaneous chunks of requests in parallel with delay?

我有一个任务队列(长度为 20),其中每个任务都是一个要调用的 ajax 请求。

我想:

1) 创建 5 个块(20/5 =4 个块)
2) 执行每个块,其中每个 中的项目将以 1000 毫秒的延迟执行。
3) 当每个块项目完成时,等待 3 秒。

所以 :

1..1sec( ↦ 绿色)..2..1sec( ↦ 绿色)..3..1sec( ↦ 绿色)..4..1sec( ↦ 绿色)..5 .................. .......3sec.......
6..1sec( ↦ 绿色)..7..1sec( ↦ 绿色)..8..1sec( ↦ 绿色)..9..1sec( ↦ 绿色)..10 ..................... .. 3sec........ ... 11..1sec( ↦ 绿色)..12..1sec( ↦ 绿色)..13..1sec( ↦ 绿色)..14..1sec( ↦ 绿色)..15 .................. .. 3秒........
16..1sec( ↦ 绿色)..17..1sec( ↦ 绿色)..18..1sec( ↦ 绿色)..19..1sec( ↦ 绿色)..20

我确实做了一些接近的事情:

有:

from(this.httpCodes)
      .pipe(bufferCount(5),
       concatMap((i, y) => from(i).pipe(mergeMap(f => {
                                    this.httpCodes[f.index].wasExecuted = true;
                                     return this.ajaxAlike(f.data).pipe(
                                                               catchError(() => { return of(-1) }),
                                                               map((r) => ({ res: r, data: f }))
                                                                      )
                                                      }) 
                                        ,delay(3000) )),

      )

但它没有按我的预期执行。我没有看到块

中每个项目之间的延迟

问题:

为什么我看到这么多请求,以及如何更改我的代码,以便块中的每个项目将以 1 秒的延迟执行(绿色应该在每一秒后出现),并且 - 在每个块之后, 等待 3 秒?

Online Demo

编辑:好的,我想现在我明白你的意思了。我更新了 fiddle,因此所有 chunk-parts 都可以并行执行,但块会相互等待完成。所以这应该可以解决问题:

const chunkSize = 5;
const requestsWithDelay = httpRequests.map(obs => obs.delay(1000));

let chunks = [];
for (let i = 0; i < requestsWithDelay.length; i += chunkSize) {
    chunks.push(requestsWithDelay.slice(i, i + chunkSize));
}

const chunkedRequests = chunks.map(chunk => Rx.Observable.forkJoin(...chunk).delay(3000));
const parallelChunkRequest = Rx.Observable.concat(...chunkedRequests);
parallelChunkRequest.subscribe();

原答案:

像这样的东西会给你想要的延迟(给定 httpRequests 作为一个 observables 数组):

const requestsWithDelay = httpRequests.map((obs, idx) => { 
  let msDelay = 1000;
  if ((idx + 1) % 5 === 0 && idx < httpRequests.length - 1) {
    msDelay = 3000;
  }

  return obs.delay(msDelay);
});

const request = Rx.Observable.concat(...requestsWithDelay);

这应该可行,但不会有 "actual" 块可观察对象。每个块中的请求不会并行执行(就像使用 mergeMap 一样),而是连续执行。

要获得 httpRequests 的可观察性,您可以这样做(但没有管道延迟):

const httpRequests = this.httpCodes.map(data => this.ajaxAlike(data));

但是如果你想让块并行执行,你可以这样做:

let chunks = [];
for (let i = 0; i < requestsWithDelay.length; i += 5) {
    chunks.push(requestsWithDelay.slice(i, i + 5));
}

const chunkedRequests = chunks.map(chunk => Rx.Observable.concat(...chunk));
const parallelChunkRequest = Rx.Observable.merge(...chunkedRequests);

我创建了一个demo Fiddle

但是,如果它们是并行执行的并且不相互等待,为什么每个块之后需要 3 秒的延迟?

delay 运算符延迟发出的项目。似乎您希望它发出该项目,然后 'sleep' 持续 3 秒,然后再发出下一个。为此,您可以 concat 一个空的延迟可观察对象

您可以创建以下 pipeable sleep 运算符:

const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms)))

并按如下方式使用:

const {concatMap, concat, delay, bufferCount} = Rx.operators;

const sleep = ms => concat(Rx.Observable.empty().pipe(delay(ms)));

const ajaxAlike = call => Rx.Observable.of(call).pipe(delay(500));

Rx.Observable.range(0, 20).pipe(
  bufferCount(5),
  concatMap(calls => 
    Rx.Observable.from(calls).pipe(
      concatMap(call => ajaxAlike(call).pipe(sleep(1000))),
      sleep(3000)
    )
  )
)
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.7/Rx.js"></script>