RxJS - 检查间隔内并发的可观察数组

RxJS - check array of observables with concurrency in interval

使用 RxJS 的调度程序工作,每秒检查作业数组。作业完成后,它会从数组中删除。我想 运行 使用 .mergeAll(concurrency) 参数,例如,同时只有两个作业 运行ning。 目前我有一个可以看到的解决方法 here.

我正在尝试的是

Observable
  .interval(1000)
  .timeInterval()
  .merge(...jobProcesses.map(job => Observable.fromPromise(startJob(job.id))))
  .mergeAll(config.concurrency || 10)
  .subscribe();

这显然行不通。任何帮助将不胜感激。

从评论来看,你似乎只是想限制并发,而这个间隔的东西只是绕了个弯路。你应该能够得到你需要的东西:

const Rx = require('rxjs/Rx')

let startTime = 0
const time = () => {
    if (!startTime)
        startTime = new Date().getTime()
    return Math.round((new Date().getTime() - startTime) / 1000)
}

const jobs = new Rx.Subject() // You may additionally rate-limit this with bufferTime(x).concatAll()
const startJob = j => Rx.Observable.of(undefined).delay(j * 1000).map(() => time())
const concurrency = 2

time()
jobs
    .bufferCount(concurrency)
    .concatMap(buf => Rx.Observable.from(buf).flatMap(startJob))
    .subscribe(x => console.log(x))

Rx.Observable.from([3, 1, 3]).subscribe(jobs)

// The last job is only processed after the first two are completed, so you see:
// 1
// 3
// 6

请注意,从技术上讲,这并没有挤出最大可能的并发量,因为它将作业分成固定的批次。如果您的作业的处理时间明显不均匀,批次中最长的作业将延迟从下一批次中拉取工作。