有条件地激活 rxjs 可观察速率限制器
conditionally active rxjs observable rate limiter
我想为可观察对象创建一个速率限制器,它有条件地改变它限制值的方式。该用例适用于不断接收要下载的新 URL 的下载器。我希望能够对传入的 url 进行排队并在两种排队方法之间切换。这两种方法是限制速率(例如,每 2 秒不超过 10 个请求)和并发请求数(例如,一次最多可以触发 10 个请求)。
一个简单的速率限制器可以像下面这样实现(借用here):
const rateLimit = (limit, rate, scheduler = asyncScheduler) => {
let tokens = limit
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return source =>
source.pipe(
mergeMap(val =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(rate, scheduler).subscribe(renewToken)
return val
})
)
)
)
}
const o = urlSource.pipe(
rateLimit(10, 2000),
mergeMap(downloadUrl)
)
o.toPromise()
这是一个简单的并发限制器:
const o = urlSource.pipe(
mergeMap(downloadUrl, maxConcurrent)
)
o.toPromise()
最后,我可以创建这个组合切换器来选择要使用的限制器类型:
const toggleableLimiter = (func, limit, rate, concurrent, toggleObservable) => {
let useRateLimiter = true
toggleObservable.subscribe(() => (useRateLimiter = !useRateLimiter))
const rateLimiter = rateLimit(limit, rate)
return source => {
const operators = useRateLimiter
? [rateLimiter, mergeMap(func)]
: [mergeMap(func, concurrent)]
return source.pipe(...operators)
}
}
const e = new EventEmitter()
const toggler = fromEvent(e, 'toggle')
const o = urlSource.pipe(
toggleableLimiter(downloadUrl, 2, 1000, 2, toggler)
)
o.toPromise()
// using rate limiter
e.emit('toggle')
// incoming values now use concurrent limiter
这一切都可以很好地解决我的问题。我可以使用事件发射器在两种方法之间切换。然而,问题在于,在 事件发出之前传递给 toggleableLimiter
的任何内容都必须遵守该限制器运算符。我想知道的是,我是否可以有条件地将值保留在队列中,并随心所欲地选择如何限制排队的值。
好的!我有一个解决方案,不幸的是它涉及到我自己处理队列。由于可观察量和背压的性质,我认为这是必要的,我在这个 issue. In the past this could have been handled more simply with the controlled
operator, but it is deprecated 中发现了很多关于这个主题的讨论。相反,我只是简单地使用了一个 timer
observable 并自己包装了 mergeMap
的并发控制(尽管 mergeMap
仍然在内部管理并发作为安全措施。
const rateLimitToggle = (func, limit, rate, maxConcurrent, toggler) => {
const rateTimer = Rx.timer(0, rate).pipe(ops.mapTo(true))
return source =>
new Rx.Observable(subscriber => {
const concurrentLimiter = new Rx.Subject()
// stateful vars
const queue = []
let inProgress = 0
let closed = false
const enqueue = val => {
queue.push(val)
concurrentLimiter.next()
}
const dequeue = useRateLimit => {
const availableSlots = useRateLimit ? limit : maxConcurrent - inProgress
const numberToDequeue = Math.min(availableSlots, queue.length)
const nextVals = queue.splice(0, numberToDequeue)
inProgress += availableSlots
return nextVals
}
Rx.merge(Rx.of(true), toggler)
.pipe(
ops.switchMap(useRateLimiter => (useRateLimiter ? rateTimer : concurrentLimiter)),
ops.takeWhile(() => !closed || queue.length),
ops.mergeMap(dequeue),
ops.mergeMap(val => func(val), maxConcurrent)
)
.subscribe(val => {
inProgress--
concurrentLimiter.next()
subscriber.next(val)
})
source.subscribe({
next(val) {
enqueue(val)
},
complete() {
closed = true
}
})
})
}
用法示例:
const timeout = n => val => {
console.log('started', val)
return new Promise(resolve => setTimeout(() => resolve(val), n))
}
const emitter = new EventEmitter()
const toggler = Rx.fromEvent(emitter, 'useRateLimiter')
const downloader = Rx.range(0, 10).pipe(
rateLimitToggle(timeout(1000), 2, 1000, 10, toggler)
)
downloader.subscribe(val => console.log('finished', val))
setTimeout(() => {
console.log('now use concurrentLimiter')
emitter.emit('useRateLimiter', false)
}, 2000)
/* outputs:
started 0
started 1
started 2
started 3 // 0 - 3 all executed under rateLimiter
finished 0
finished 1
now use concurrentLimiter
started 4 // 4 - 9 executed under concurrentLimiter
started 5
started 6
started 7
started 8
started 9
finished 2
finished 3
finished 4
finished 5
finished 6
finished 7
finished 8
finished 9
*/
我想为可观察对象创建一个速率限制器,它有条件地改变它限制值的方式。该用例适用于不断接收要下载的新 URL 的下载器。我希望能够对传入的 url 进行排队并在两种排队方法之间切换。这两种方法是限制速率(例如,每 2 秒不超过 10 个请求)和并发请求数(例如,一次最多可以触发 10 个请求)。
一个简单的速率限制器可以像下面这样实现(借用here):
const rateLimit = (limit, rate, scheduler = asyncScheduler) => {
let tokens = limit
const tokenChanged = new BehaviorSubject(tokens)
const consumeToken = () => tokenChanged.next(--tokens)
const renewToken = () => tokenChanged.next(++tokens)
const availableTokens = tokenChanged.pipe(filter(() => tokens > 0))
return source =>
source.pipe(
mergeMap(val =>
availableTokens.pipe(
take(1),
map(() => {
consumeToken()
timer(rate, scheduler).subscribe(renewToken)
return val
})
)
)
)
}
const o = urlSource.pipe(
rateLimit(10, 2000),
mergeMap(downloadUrl)
)
o.toPromise()
这是一个简单的并发限制器:
const o = urlSource.pipe(
mergeMap(downloadUrl, maxConcurrent)
)
o.toPromise()
最后,我可以创建这个组合切换器来选择要使用的限制器类型:
const toggleableLimiter = (func, limit, rate, concurrent, toggleObservable) => {
let useRateLimiter = true
toggleObservable.subscribe(() => (useRateLimiter = !useRateLimiter))
const rateLimiter = rateLimit(limit, rate)
return source => {
const operators = useRateLimiter
? [rateLimiter, mergeMap(func)]
: [mergeMap(func, concurrent)]
return source.pipe(...operators)
}
}
const e = new EventEmitter()
const toggler = fromEvent(e, 'toggle')
const o = urlSource.pipe(
toggleableLimiter(downloadUrl, 2, 1000, 2, toggler)
)
o.toPromise()
// using rate limiter
e.emit('toggle')
// incoming values now use concurrent limiter
这一切都可以很好地解决我的问题。我可以使用事件发射器在两种方法之间切换。然而,问题在于,在 事件发出之前传递给 toggleableLimiter
的任何内容都必须遵守该限制器运算符。我想知道的是,我是否可以有条件地将值保留在队列中,并随心所欲地选择如何限制排队的值。
好的!我有一个解决方案,不幸的是它涉及到我自己处理队列。由于可观察量和背压的性质,我认为这是必要的,我在这个 issue. In the past this could have been handled more simply with the controlled
operator, but it is deprecated 中发现了很多关于这个主题的讨论。相反,我只是简单地使用了一个 timer
observable 并自己包装了 mergeMap
的并发控制(尽管 mergeMap
仍然在内部管理并发作为安全措施。
const rateLimitToggle = (func, limit, rate, maxConcurrent, toggler) => {
const rateTimer = Rx.timer(0, rate).pipe(ops.mapTo(true))
return source =>
new Rx.Observable(subscriber => {
const concurrentLimiter = new Rx.Subject()
// stateful vars
const queue = []
let inProgress = 0
let closed = false
const enqueue = val => {
queue.push(val)
concurrentLimiter.next()
}
const dequeue = useRateLimit => {
const availableSlots = useRateLimit ? limit : maxConcurrent - inProgress
const numberToDequeue = Math.min(availableSlots, queue.length)
const nextVals = queue.splice(0, numberToDequeue)
inProgress += availableSlots
return nextVals
}
Rx.merge(Rx.of(true), toggler)
.pipe(
ops.switchMap(useRateLimiter => (useRateLimiter ? rateTimer : concurrentLimiter)),
ops.takeWhile(() => !closed || queue.length),
ops.mergeMap(dequeue),
ops.mergeMap(val => func(val), maxConcurrent)
)
.subscribe(val => {
inProgress--
concurrentLimiter.next()
subscriber.next(val)
})
source.subscribe({
next(val) {
enqueue(val)
},
complete() {
closed = true
}
})
})
}
用法示例:
const timeout = n => val => {
console.log('started', val)
return new Promise(resolve => setTimeout(() => resolve(val), n))
}
const emitter = new EventEmitter()
const toggler = Rx.fromEvent(emitter, 'useRateLimiter')
const downloader = Rx.range(0, 10).pipe(
rateLimitToggle(timeout(1000), 2, 1000, 10, toggler)
)
downloader.subscribe(val => console.log('finished', val))
setTimeout(() => {
console.log('now use concurrentLimiter')
emitter.emit('useRateLimiter', false)
}, 2000)
/* outputs:
started 0
started 1
started 2
started 3 // 0 - 3 all executed under rateLimiter
finished 0
finished 1
now use concurrentLimiter
started 4 // 4 - 9 executed under concurrentLimiter
started 5
started 6
started 7
started 8
started 9
finished 2
finished 3
finished 4
finished 5
finished 6
finished 7
finished 8
finished 9
*/