RxJs 如何将两个重叠的 observable 合并为一个

RxJs how to merge two overlapping observable into one

我有两个可观察值:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-|
-13--14--15--16--17--18--19-----20---------21--------------22------23--24-->

第一个包含一些递增的数字,但一段时间后停止(这些是数据库的游标结果) 第二个正在不断增加数量。从一开始就包含一些数字,但不要停止发射。 (这些是新插入数据库的数据)

我希望这两个 observable 看起来像这样一个连续的 observable:

-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24-->

这个 observable 只包含每个数字一次,保持发射顺序。

如何使用尽可能少的内存来解决?

您可以通过将第一个流中的所有元素与第二个流连接(.concat),除了最新的(.last)之前的(.skipWhile包括)元素

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'
const fromMarble = str => Rx.Observable.defer(() => {
  console.log('side effect from subscribing to: ' + str);
  return Rx.Observable.from(str.split('-').filter(v => v.length));
});

const a$ = fromMarble(a);
const b$ = fromMarble(b);

const distinct$ = Rx.Observable.concat(
  a$,
  a$.last().switchMap(latest =>
    // .skipWhile + .skip(1) => skipWhile but inclusive
    b$.skipWhile(v => v !== latest).skip(1)
  ),
);

distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

此外,如果您在订阅时有副作用(例如,当您订阅时 - 将创建新游标),您可以通过使用 const a$ = fromMarble(a).shareReaplay() 等方式为所有订阅者共享该副作用。

您可以阅读更多关于分享副作用的内容:

我认为这里最好的方法是缓冲 b$ 直到 a$ 流到达 b$,然后发出 b$ 的所有缓冲项目并切换到 b$。像这样:

const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15';
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24';

const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x));

const a$ = fromMarble(a).share();
const b$ = fromMarble(b).share();

const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share();

const distinct$ = Rx.Observable.merge(
 a$.takeUntil(switchingSignal$).map(x => x + '(from a)'), 
 b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'),
 b$.skipUntil(switchingSignal$).map(x => x + '(from b$)')
);

distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>