我怎样才能一次只激活 X 个可观察对象?
How can I have only X observables active at a time?
I'm using RxJS v6, but the answer could apply to RxJS v5 as well.
我想这样做,如果我说了 8 个值,我一次只会有 4 个被主动处理。
我现在的代码发送 4 个项目,然后等到所有 4 个都完成,然后发送下一个 4 个。我希望它发送前 4 个,然后当一个 observable 完成时,另一个进来处理。
from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
bufferCount(4),
concatMap(values => (
from(values)
.pipe(
mergeMap(value => (
timer(1000) // Async stuff would happen here
.pipe(
mapTo(value),
)
)),
)
)),
)
.subscribe(console.log)
你必须使用 mergeMap
的第二个参数,它是一个允许指定 concurrency level 的数字你要。
所以你的代码看起来像
from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
map(val => of(val)),
mergeMap(val => val, 4),
)
.subscribe(console.log)
或
of([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
mergeMap(val => val, 4),
)
.subscribe(console.log)
考虑 concatMap
是 mergeMap
,并发级别设置为 1。
阅读 了解有关 mergeMap
的更多详细信息。
I'm using RxJS v6, but the answer could apply to RxJS v5 as well.
我想这样做,如果我说了 8 个值,我一次只会有 4 个被主动处理。
我现在的代码发送 4 个项目,然后等到所有 4 个都完成,然后发送下一个 4 个。我希望它发送前 4 个,然后当一个 observable 完成时,另一个进来处理。
from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
bufferCount(4),
concatMap(values => (
from(values)
.pipe(
mergeMap(value => (
timer(1000) // Async stuff would happen here
.pipe(
mapTo(value),
)
)),
)
)),
)
.subscribe(console.log)
你必须使用 mergeMap
的第二个参数,它是一个允许指定 concurrency level 的数字你要。
所以你的代码看起来像
from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
map(val => of(val)),
mergeMap(val => val, 4),
)
.subscribe(console.log)
或
of([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
mergeMap(val => val, 4),
)
.subscribe(console.log)
考虑 concatMap
是 mergeMap
,并发级别设置为 1。
阅读 mergeMap
的更多详细信息。