RxJs 如何将两个重叠的 observable 合并为一个
RxJs how to merge two overlapping observable into one
我有两个可观察值:
-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-|
-13--14--15--16--17--18--19-----20---------21--------------22------23--24-->
第一个包含一些递增的数字,但一段时间后停止(这些是数据库的游标结果)
第二个正在不断增加数量。从一开始就包含一些数字,但不要停止发射。 (这些是新插入数据库的数据)
我希望这两个 observable 看起来像这样一个连续的 observable:
-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24-->
这个 observable 只包含每个数字一次,保持发射顺序。
如何使用尽可能少的内存来解决?
您可以通过将第一个流中的所有元素与第二个流连接(.concat
),除了最新的(.last
)之前的(.skipWhile
包括)元素
const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'
const fromMarble = str => Rx.Observable.defer(() => {
console.log('side effect from subscribing to: ' + str);
return Rx.Observable.from(str.split('-').filter(v => v.length));
});
const a$ = fromMarble(a);
const b$ = fromMarble(b);
const distinct$ = Rx.Observable.concat(
a$,
a$.last().switchMap(latest =>
// .skipWhile + .skip(1) => skipWhile but inclusive
b$.skipWhile(v => v !== latest).skip(1)
),
);
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
此外,如果您在订阅时有副作用(例如,当您订阅时 - 将创建新游标),您可以通过使用 const a$ = fromMarble(a).shareReaplay()
等方式为所有订阅者共享该副作用。
您可以阅读更多关于分享副作用的内容:
- 在 RxJS v4 的旧文档中 - 4.8 Use the publish operator to share side-effects
- 和这篇文章 - RxSwift: share vs replay vs shareReplay
我认为这里最好的方法是缓冲 b$ 直到 a$ 流到达 b$,然后发出 b$ 的所有缓冲项目并切换到 b$。像这样:
const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15';
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24';
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x));
const a$ = fromMarble(a).share();
const b$ = fromMarble(b).share();
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share();
const distinct$ = Rx.Observable.merge(
a$.takeUntil(switchingSignal$).map(x => x + '(from a)'),
b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'),
b$.skipUntil(switchingSignal$).map(x => x + '(from b$)')
);
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>
我有两个可观察值:
-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-|
-13--14--15--16--17--18--19-----20---------21--------------22------23--24-->
第一个包含一些递增的数字,但一段时间后停止(这些是数据库的游标结果) 第二个正在不断增加数量。从一开始就包含一些数字,但不要停止发射。 (这些是新插入数据库的数据)
我希望这两个 observable 看起来像这样一个连续的 observable:
-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15-16-17-18-19-20-21-----22------23--24-->
这个 observable 只包含每个数字一次,保持发射顺序。
如何使用尽可能少的内存来解决?
您可以通过将第一个流中的所有元素与第二个流连接(.concat
),除了最新的(.last
)之前的(.skipWhile
包括)元素
const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15'
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24'
const fromMarble = str => Rx.Observable.defer(() => {
console.log('side effect from subscribing to: ' + str);
return Rx.Observable.from(str.split('-').filter(v => v.length));
});
const a$ = fromMarble(a);
const b$ = fromMarble(b);
const distinct$ = Rx.Observable.concat(
a$,
a$.last().switchMap(latest =>
// .skipWhile + .skip(1) => skipWhile but inclusive
b$.skipWhile(v => v !== latest).skip(1)
),
);
distinct$.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
此外,如果您在订阅时有副作用(例如,当您订阅时 - 将创建新游标),您可以通过使用 const a$ = fromMarble(a).shareReaplay()
等方式为所有订阅者共享该副作用。
您可以阅读更多关于分享副作用的内容:
- 在 RxJS v4 的旧文档中 - 4.8 Use the publish operator to share side-effects
- 和这篇文章 - RxSwift: share vs replay vs shareReplay
我认为这里最好的方法是缓冲 b$ 直到 a$ 流到达 b$,然后发出 b$ 的所有缓冲项目并切换到 b$。像这样:
const a = '-1-2-3-4-5-6-7-8-9-10-11-12-13-14-15';
const b = '-13--14--15--16--17--18--19-----20---------21--------------22------23--24';
const fromMarble = str => Rx.Observable.from(str.split('-')).concatMap(x => Rx.Observable.of(x).delay(1)).filter(v => v.length).map(x => parseInt(x));
const a$ = fromMarble(a).share();
const b$ = fromMarble(b).share();
const switchingSignal$ = Rx.Observable.combineLatest(a$, b$.take(1), (a, b) => a >= b).filter(x => x).take(1).share();
const distinct$ = Rx.Observable.merge(
a$.takeUntil(switchingSignal$).map(x => x + '(from a)'),
b$.buffer(switchingSignal$).take(1).mergeMap(buffered => Rx.Observable.from(buffered)).map(x => x + '(from b$ buffer)'),
b$.skipUntil(switchingSignal$).map(x => x + '(from b$)')
);
distinct$.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.0/Rx.js"></script>