在 Rxjs 中,如何展平或合并包含普通类型和 Observable 的流?
In Rxjs, how do I flatten or merge a stream containing both normal types and Observables?
类似于数组,flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6]
。
我想在 rxjs observables 中这样做:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()
test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7
mergeAll 不工作并抛出错误。
这是非常肮脏的解决方案:
const inElegant$ = Rx.Observable.merge(
test$.filter(x => x instanceof Rx.Observable).mergeAll(),
test$.filter(x => !(x instanceof Rx.Observable))
)
inElegant$.subscribe(x => console.log(x));
有没有更好的解决方案?
根据documentation mergeAll
Merges an observable sequence of observable sequences into an observable sequence.
你有一个混合集合(数字和可观察值)所以不起作用。
做到这一点的唯一方法是你正在做的事情,用不同的方法处理这两种类型:尽管你应该问问自己,为什么你在同一个序列中同时拥有 Observable 和原始类型。
我个人在这些情况下使用 toObservable
转换函数。该函数保持 observable 不变,并将其他类型包装在一个 observable 中(Rx.Observable.return
)。所以它是这样使用的:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()
这与您正在做的很接近,但我发现打包在一个单独的函数中很方便,可以在其他上下文中重复使用。
如果我们在表单上有一个流
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])
有几种方法可以将其转换为纯数字流(不包括您的解决方案中的过滤。)
这是三种可能的解决方案:
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
这看起来不错。但是,有几件事需要考虑。如果我们改变源流使其异步:
const asyncStream = Rx.Observable.interval(1000)
.select((val, idx) => idx + 8).take(5);
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
asyncStream, 13, 14, 15])
我们使用与之前相同的解决方案得到以下结果:
// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
所以总结一下。使用 selectMany
或 select
后跟 mergeAll
可以解决生成正确类型的扁平化列表的问题,但不会保持顺序。这些解决方案将监听所有流并在任何流产生值时产生结果。
concatAll
解决方案的行为略有不同。此解决方案将按顺序收听每个流,仅在最后一个流完成时切换到下一个 value/stream。
所以这些是一些解决方案,你想要哪个取决于你的需要。然而,所有这些都摆脱了过滤流的需要。
类似于数组,flatten([1, 2 [3, 4], [5, 6]]) === [1, 2, 3, 4, 5, 6]
。
我想在 rxjs observables 中这样做:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).mergeAll()
test$.subscribe(x => console.log(x)) //I want to output 1, 2, 3, 4, 5, 6, 7
mergeAll 不工作并抛出错误。
这是非常肮脏的解决方案:
const inElegant$ = Rx.Observable.merge(
test$.filter(x => x instanceof Rx.Observable).mergeAll(),
test$.filter(x => !(x instanceof Rx.Observable))
)
inElegant$.subscribe(x => console.log(x));
有没有更好的解决方案?
根据documentation mergeAll
Merges an observable sequence of observable sequences into an observable sequence.
你有一个混合集合(数字和可观察值)所以不起作用。
做到这一点的唯一方法是你正在做的事情,用不同的方法处理这两种类型:尽管你应该问问自己,为什么你在同一个序列中同时拥有 Observable 和原始类型。
我个人在这些情况下使用 toObservable
转换函数。该函数保持 observable 不变,并将其他类型包装在一个 observable 中(Rx.Observable.return
)。所以它是这样使用的:
const test$ = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7])]
).map(toObservable).mergeAll()
这与您正在做的很接近,但我发现打包在一个单独的函数中很方便,可以在其他上下文中重复使用。
如果我们在表单上有一个流
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]), 8])
有几种方法可以将其转换为纯数字流(不包括您的解决方案中的过滤。)
这是三种可能的解决方案:
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
这看起来不错。但是,有几件事需要考虑。如果我们改变源流使其异步:
const asyncStream = Rx.Observable.interval(1000)
.select((val, idx) => idx + 8).take(5);
const stream = Rx.Observable.from(
[1, 2, Rx.Observable.from([3, 4]), 5, Rx.Observable.from([6, 7]),
asyncStream, 13, 14, 15])
我们使用与之前相同的解决方案得到以下结果:
// prints 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.concatAll()
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.selectMany(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.subscribe(x => console.log(x));
// prints 1, 2, 3, 4, 5, 6, 7, 13, 14, 15, 8, 9, 10, 11, 12
stream
.select(e => typeof e == 'number' ? Rx.Observable.from([e]) : e)
.mergeAll()
.subscribe(x => console.log(x));
所以总结一下。使用 selectMany
或 select
后跟 mergeAll
可以解决生成正确类型的扁平化列表的问题,但不会保持顺序。这些解决方案将监听所有流并在任何流产生值时产生结果。
concatAll
解决方案的行为略有不同。此解决方案将按顺序收听每个流,仅在最后一个流完成时切换到下一个 value/stream。
所以这些是一些解决方案,你想要哪个取决于你的需要。然而,所有这些都摆脱了过滤流的需要。