RxJS - 检查间隔内并发的可观察数组
RxJS - check array of observables with concurrency in interval
使用 RxJS 的调度程序工作,每秒检查作业数组。作业完成后,它会从数组中删除。我想 运行 使用 .mergeAll(concurrency) 参数,例如,同时只有两个作业 运行ning。
目前我有一个可以看到的解决方法 here.
我正在尝试的是
Observable
.interval(1000)
.timeInterval()
.merge(...jobProcesses.map(job => Observable.fromPromise(startJob(job.id))))
.mergeAll(config.concurrency || 10)
.subscribe();
这显然行不通。任何帮助将不胜感激。
从评论来看,你似乎只是想限制并发,而这个间隔的东西只是绕了个弯路。你应该能够得到你需要的东西:
const Rx = require('rxjs/Rx')
let startTime = 0
const time = () => {
if (!startTime)
startTime = new Date().getTime()
return Math.round((new Date().getTime() - startTime) / 1000)
}
const jobs = new Rx.Subject() // You may additionally rate-limit this with bufferTime(x).concatAll()
const startJob = j => Rx.Observable.of(undefined).delay(j * 1000).map(() => time())
const concurrency = 2
time()
jobs
.bufferCount(concurrency)
.concatMap(buf => Rx.Observable.from(buf).flatMap(startJob))
.subscribe(x => console.log(x))
Rx.Observable.from([3, 1, 3]).subscribe(jobs)
// The last job is only processed after the first two are completed, so you see:
// 1
// 3
// 6
请注意,从技术上讲,这并没有挤出最大可能的并发量,因为它将作业分成固定的批次。如果您的作业的处理时间明显不均匀,批次中最长的作业将延迟从下一批次中拉取工作。
使用 RxJS 的调度程序工作,每秒检查作业数组。作业完成后,它会从数组中删除。我想 运行 使用 .mergeAll(concurrency) 参数,例如,同时只有两个作业 运行ning。 目前我有一个可以看到的解决方法 here.
我正在尝试的是
Observable
.interval(1000)
.timeInterval()
.merge(...jobProcesses.map(job => Observable.fromPromise(startJob(job.id))))
.mergeAll(config.concurrency || 10)
.subscribe();
这显然行不通。任何帮助将不胜感激。
从评论来看,你似乎只是想限制并发,而这个间隔的东西只是绕了个弯路。你应该能够得到你需要的东西:
const Rx = require('rxjs/Rx')
let startTime = 0
const time = () => {
if (!startTime)
startTime = new Date().getTime()
return Math.round((new Date().getTime() - startTime) / 1000)
}
const jobs = new Rx.Subject() // You may additionally rate-limit this with bufferTime(x).concatAll()
const startJob = j => Rx.Observable.of(undefined).delay(j * 1000).map(() => time())
const concurrency = 2
time()
jobs
.bufferCount(concurrency)
.concatMap(buf => Rx.Observable.from(buf).flatMap(startJob))
.subscribe(x => console.log(x))
Rx.Observable.from([3, 1, 3]).subscribe(jobs)
// The last job is only processed after the first two are completed, so you see:
// 1
// 3
// 6
请注意,从技术上讲,这并没有挤出最大可能的并发量,因为它将作业分成固定的批次。如果您的作业的处理时间明显不均匀,批次中最长的作业将延迟从下一批次中拉取工作。