合并实现为 flatMap

Merge implemented as flatMap

理论上应该可以通过 flatMap() 实现任何 RxJS 运算符(just()flatMap() 除外)。例如 map() 可以实现为

function map(source, selector) {
  return source.flatMap(x => Rx.Observable.just(selector(x)));
}

如何通过flatMap()实现merge()? (当然也要避免 mergeAll()

如果您利用 flatMap 也可以采用数组 return 值这一事实,这看起来是可能的。

Rx.Observable.prototype.merge = function(other) {
  var source = this;
  return Rx.Observable.just([source, other])
           //Flattens the array into observable of observables
           .flatMap(function(arr) { return arr; })
           //Flatten out the observables
           .flatMap(function(x) { return x; });
}

编辑 1

使用 RxJS 6 和 pipe 语法

import {of} from 'rxjs'
import {flatMap} from 'rxjs/operators'

function merge (other) {
  return source => of([source, other]).pipe(
           //Flattens the array into observable of observables
           flatMap(arr => arr)
           //Flatten out the observables
           flatMap(x => x)
         );
}

const {timestamp, map, flatMap, take} = rxjs.operators;
const {interval, of: just} = rxjs;

const source1 = interval(2000).pipe(
  timestamp(),
  map(x => "Interval 1 at " + x.timestamp + " w/ " + x.value)
)

const source2 = interval(3000).pipe(
  timestamp(),
  map(x => "Interval 2 at " + x.timestamp + " w/ " + x.value)
)

function mergeFromFlatMap (other) {
  return source => just([source, other]).pipe(
    flatMap(arr => arr),
    flatMap(seq => seq)
  )
}

source1.pipe(
  mergeFromFlatMap(source2),
  take(20)
).subscribe(console.log.bind(console));
<script src="https://unpkg.com/rxjs/bundles/rxjs.umd.min.js"></script>