在 Rxjs 中,如何展平或合并包含普通类型和 Observable 的流?

In Rxjs, how do I flatten or merge a stream containing both normal types and Observables?

类似于数组,flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6]

我想在 rxjs observables 中这样做:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()

test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7

mergeAll 不工作并抛出错误。

这是非常肮脏的解决方案:

const inElegant$ = Rx.Observable.merge(
  test$.filter(x => x instanceof Rx.Observable).mergeAll(),
  test$.filter(x => !(x instanceof Rx.Observable))
)

inElegant$.subscribe(x => console.log(x));

有没有更好的解决方案?

Jsbin http://jsbin.com/vohizoqiza/1/edit?js,console

根据documentation mergeAll

Merges an observable sequence of observable sequences into an observable sequence.

你有一个混合集合(数字和可观察值)所以不起作用。

做到这一点的唯一方法是你正在做的事情,用不同的方法处理这两种类型:尽管你应该问问自己,为什么你在同一个序列中同时拥有 Observable 和原始类型。

我个人在这些情况下使用 toObservable 转换函数。该函数保持 observable 不变,并将其他类型包装在一个 observable 中(Rx.Observable.return)。所以它是这样使用的:

const test$ = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()

这与您正在做的很接近,但我发现打包在一个单独的函数中很方便,可以在其他上下文中重复使用。

如果我们在表单上有一个流

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])

有几种方法可以将其转换为纯数字流(不包括您的解决方案中的过滤。)

这是三种可能的解决方案:

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

这看起来不错。但是,有几件事需要考虑。如果我们改变源流使其异步:

const asyncStream = Rx.Observable.interval(1000)
    .select((val, idx) => idx + 8).take(5);

const stream = Rx.Observable.from(
    [1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
    asyncStream, 13, 14, 15])

我们使用与之前相同的解决方案得到以下结果:

// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .concatAll()
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .subscribe(x => console.log(x));

// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
    .select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
    .mergeAll()
    .subscribe(x => console.log(x));

所以总结一下。使用 selectManyselect 后跟 mergeAll 可以解决生成正确类型的扁平化列表的问题,但不会保持顺序。这些解决方案将监听所有流并在任何流产生值时产生结果。

concatAll 解决方案的行为略有不同。此解决方案将按顺序收听每个流,仅在最后一个流完成时切换到下一个 value/stream。

所以这些是一些解决方案,你想要哪个取决于你的需要。然而,所有这些都摆脱了过滤流的需要。