如何创建一个包含其他流值数组的流?

How can I create a stream with an array of the value of other streams?

我正在尝试创建这样的结构:

+----o---->
+------------o---->
+-o---->
+------------o---->

mergeAsync()

+----------------->[o, o, o, o]

我正在寻找将一些流的值连接到数组中的方法,我尝试合并、连接或压缩结果,但我不想使用 merge/concat/zip.

jsbin code snippet

你知道有什么干净的方法可以做到这一点吗?

const log = (stream) => {
  return stream.subscribe(
    x => console.log(x),
    e => console.log(`onError: ${e}`),
    () => console.log('onCompleted')
  )
}

let m$ = Rx.Observable.range(99, 140).take(3)
const  = Rx.Observable.range(0, 10).take(3)

m$ = Rx.Observable.merge(, m$).toArray()

log(m$)

我可能可以像这样使用 zip 解决这个问题:

const log = (stream) => {
  return stream.subscribe(
    x => console.log(x),
    e => console.log(`onError: ${e}`),
    () => console.log('onCompleted')
  )
}

const s1 = Rx.Observable.of({key: ''})
const s2 = Rx.Observable.of({key: ''})
const s3 = Rx.Observable.of({key: ''})

let m$ = Rx.Observable.zip(s1, s2)

log(m$)

我删除了范围并改用对象(和 .of,但我可以使用 .return 或其他),因为在这种情况下没有意义。

但最终,我想将 s3 流添加到 m$ 中,但我不确定该怎么做...

听起来您可能正在寻找这样的东西:

var ss$ = new Rx.Subject();

const s1$ = Rx.Observable.range(0, 3);
const s2$ = Rx.Observable.range(10, 3)

ss$
  .switch()
  .scan((a, c) => {
    const newArray = a.slice();
    newArray.push(c);
    return newArray;
  }, [])
  .subscribe(x => console.log(x));

ss$.onNext(s1$);
ss$.onNext(s2$);

https://jsbin.com/fidowo/edit?js,console,output

ss$ 是一个 observables 的主题,你可以向它推送新的 observables。这些可观察量中的每一个都是一个数字流,每次将一个新元素添加到任何一个流中时,都会发出一个包含所有值的新数组。这就是你想要的吗?