Rx.js 承诺并发
Rx.js concurrency with promises
我想通过一系列 async/network 操作(远程 HTTP 请求)来处理一组对象。
在其中一些操作中,我想确保同时处理的项目不超过 X。
我怎样才能做到这一点?
示例代码:
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
.flatMap((item) => {
// I WANT THE FOLLOWING OPERATION TO BE EXECUTING
// ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD
// BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A
// RESULT OF THE PROMISE SUCCEEDING OR FAILING
return Rx.Observable.fromPromise(someAsyncOp(item))
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)
您可以使用 merge
和 map
而不是 flatMap
:
var concurrency = 10;
source.map(someAsyncOp).merge(concurrency).subscribe(x => console.log(x));
请注意,由于 promises 是热切的,而 observables 是懒惰的,因此 fromPromise 不会削减它(并且 Rx 无论如何也可以在没有它的情况下吸收 promises)。我建议将其包装在 create
.
中
var delay = function(ms){ return new Promise(function(r){ setTimeout(r, 2000, ms) }); }
var log = function(msg){ document.body.innerHTML += msg + "<br />"; }
Rx.Observable.range(1000, 10).map(delay).merge(2).subscribe(log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
flatMap
有一个名为 flatMapWithMaxConcurrent
的兄弟,它采用并发参数。它在功能上类似于本杰明的回答所建议的map(fn).merge(n)
。
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
//Only allow a max of 10 items to be subscribed to at once
.flatMapWithMaxConcurrent(10, (item) => {
//Since a promise is eager you need to defer execution of the function
//that produces it until subscription. Defer will implicitly accept a promise
return Rx.Observable.defer(() => someAsyncOp(item))
//If you want the whole thing to continue regardless of exceptions you should also
//catch errors from the individual processes
.catch(Rx.Observable.empty())
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)
我想通过一系列 async/network 操作(远程 HTTP 请求)来处理一组对象。
在其中一些操作中,我想确保同时处理的项目不超过 X。
我怎样才能做到这一点?
示例代码:
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
.flatMap((item) => {
// I WANT THE FOLLOWING OPERATION TO BE EXECUTING
// ON AT MAX 10 ITEMS AT A TIME, NEXT ITEM SHOULD
// BE SUBMITTED ONLY WHEN A SLOT GETS FREED AS A
// RESULT OF THE PROMISE SUCCEEDING OR FAILING
return Rx.Observable.fromPromise(someAsyncOp(item))
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)
您可以使用 merge
和 map
而不是 flatMap
:
var concurrency = 10;
source.map(someAsyncOp).merge(concurrency).subscribe(x => console.log(x));
请注意,由于 promises 是热切的,而 observables 是懒惰的,因此 fromPromise 不会削减它(并且 Rx 无论如何也可以在没有它的情况下吸收 promises)。我建议将其包装在 create
.
var delay = function(ms){ return new Promise(function(r){ setTimeout(r, 2000, ms) }); }
var log = function(msg){ document.body.innerHTML += msg + "<br />"; }
Rx.Observable.range(1000, 10).map(delay).merge(2).subscribe(log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.js"></script>
flatMap
有一个名为 flatMapWithMaxConcurrent
的兄弟,它采用并发参数。它在功能上类似于本杰明的回答所建议的map(fn).merge(n)
。
function someAsyncOp(item) {...} // returns a promise
var source = Rx.Observable.from([{item1},{item2},...])
source
//Only allow a max of 10 items to be subscribed to at once
.flatMapWithMaxConcurrent(10, (item) => {
//Since a promise is eager you need to defer execution of the function
//that produces it until subscription. Defer will implicitly accept a promise
return Rx.Observable.defer(() => someAsyncOp(item))
//If you want the whole thing to continue regardless of exceptions you should also
//catch errors from the individual processes
.catch(Rx.Observable.empty())
})
.subscribe(
console.log,
console.error,
() => console.log('completed')
)