合并实现为 flatMap
Merge implemented as flatMap
理论上应该可以通过 flatMap()
实现任何 RxJS 运算符(just()
和 flatMap()
除外)。例如 map()
可以实现为
function map(source, selector) {
return source.flatMap(x => Rx.Observable.just(selector(x)));
}
如何通过flatMap()
实现merge()
? (当然也要避免 mergeAll()
)
如果您利用 flatMap 也可以采用数组 return 值这一事实,这看起来是可能的。
Rx.Observable.prototype.merge = function(other) {
var source = this;
return Rx.Observable.just([source, other])
//Flattens the array into observable of observables
.flatMap(function(arr) { return arr; })
//Flatten out the observables
.flatMap(function(x) { return x; });
}
编辑 1
使用 RxJS 6 和 pipe
语法
import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'
function merge (other) {
return source => of([source, other]).pipe(
//Flattens the array into observable of observables
flatMap(arr => arr)
//Flatten out the observables
flatMap(x => x)
);
}
const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;
const source1 = interval(2000).pipe(
timestamp(),
map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)
const source2 = interval(3000).pipe(
timestamp(),
map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)
function mergeFromFlatMap (other) {
return source => just([source, other]).pipe(
flatMap(arr => arr),
flatMap(seq => seq)
)
}
source1.pipe(
mergeFromFlatMap(source2),
take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>
理论上应该可以通过 flatMap()
实现任何 RxJS 运算符(just()
和 flatMap()
除外)。例如 map()
可以实现为
function map(source, selector) {
return source.flatMap(x => Rx.Observable.just(selector(x)));
}
如何通过flatMap()
实现merge()
? (当然也要避免 mergeAll()
)
如果您利用 flatMap 也可以采用数组 return 值这一事实,这看起来是可能的。
Rx.Observable.prototype.merge = function(other) {
var source = this;
return Rx.Observable.just([source, other])
//Flattens the array into observable of observables
.flatMap(function(arr) { return arr; })
//Flatten out the observables
.flatMap(function(x) { return x; });
}
编辑 1
使用 RxJS 6 和 pipe
语法
import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'
function merge (other) {
return source => of([source, other]).pipe(
//Flattens the array into observable of observables
flatMap(arr => arr)
//Flatten out the observables
flatMap(x => x)
);
}
const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;
const source1 = interval(2000).pipe(
timestamp(),
map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)
const source2 = interval(3000).pipe(
timestamp(),
map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)
function mergeFromFlatMap (other) {
return source => just([source, other]).pipe(
flatMap(arr => arr),
flatMap(seq => seq)
)
}
source1.pipe(
mergeFromFlatMap(source2),
take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>