组合固定数量的 mergeMapped observables - RxJS

Combine a fixed number of mergeMapped observables - RxJS

我使用的是 RxJS v6,但这个问题也适用于 v5。

当使用 mergeMap 时,我的原始数组消失了,虽然我可以并行执行许多操作,但当我发送到 mergeMap 的所有那些可观察对象都被删除时,我不再有监控方式完成。

例子

of([1, 2, 3, 4])
.pipe(
    mergeMap(values => values),
)
.subscribe(console.log)

// 1
// 2
// 3
// 4

我想看:

// [1, 2, 3, 4]

到目前为止我想出的唯一方法是获取数组的长度,但我确定一定有一些运算符是我遗漏的:

of([1, 2, 3, 4])
.pipe(
    switchMap(values => (
        of(values)
        .pipe(
            mergeMap(value => value),
            bufferCount(values.length),
        )
    ))
)
.subscribe(console.log)

When using mergeMap, my original array disappears

原因是 mergeMap 接受一个 ObservableInput 作为您传入的函数的参数。javascript Array 是一个 ObservableInput,因此在 mergeMap 中工作,mergeMap 完成它的工作,即 展平 ObservableInput(考虑到 mergeMap 之前被称为 flatMap).

所以,正如@cartant 所说,如果你想返回一个数组,你必须使用 toArray 运算符。 也就是说

of([1, 2, 3, 4])
.pipe(
    mergeMap(value => { // do stuff with a value of an array}),
    toArray()
)
.subscribe(console.log)

等同于

of([1, 2, 3, 4])
.pipe(
    map(values => values.map(value => { // do stuff with a value of an array})),
)

如果您的数组虽然包含 Observables,并且您希望最终获得它们在所有 Observable 发出时通知的值,那么您必须使用 forkJoin。这是一个简单的例子

of([1, 2, 3, 4].map(n => of(n)))
.pipe(
    switchMap(observablesOfValues => forkJoin(observablesOfValues))
)
.subscribe(console.log)