我怎样才能一次只激活 X 个可观察对象?

How can I have only X observables active at a time?

I'm using RxJS v6, but the answer could apply to RxJS v5 as well.

我想这样做,如果我说了 8 个值,我一次只会有 4 个被主动处理。

我现在的代码发送 4 个项目,然后等到所有 4 个都完成,然后发送下一个 4 个。我希望它发送前 4 个,然后当一个 observable 完成时,另一个进来处理。

from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    bufferCount(4),
    concatMap(values => (
        from(values)
        .pipe(
            mergeMap(value => (
                timer(1000) // Async stuff would happen here
                .pipe(
                    mapTo(value),
                )
            )),
        )
    )),
)
.subscribe(console.log)

你必须使用 mergeMap 的第二个参数,它是一个允许指定 concurrency level 的数字你要。

所以你的代码看起来像

from([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    map(val => of(val)),
    mergeMap(val => val, 4),
)
.subscribe(console.log)

of([1, 2, 3, 4, 5, 6, 7, 8])
.pipe(
    mergeMap(val => val, 4),
)
.subscribe(console.log)

考虑 concatMapmergeMap,并发级别设置为 1。

阅读 了解有关 mergeMap 的更多详细信息。