从不断变化的 Observables 列表中合并事件
Merge events from a changing list of Observables
我正在使用 rxjs
。
我有一个 Browser
负责许多 Page
对象。每个页面都有一个 Observable<Event>
产生事件流。
Page
对象在不同时间关闭和打开。我想创建一个名为 TheOneObservable
的可观察对象,它将合并来自所有当前活动的 Page
对象的所有事件,并合并来自 Browser
对象本身的自定义事件。
关闭 Page
意味着应该关闭对它的订阅,这样就不会阻止它被 GC。
我的问题是Pages
可以随时关闭,这意味着被合并的Observables的数量总是在变化。我想过使用 Pages
的 Observable 并使用 mergeMap
,但是这有问题。例如订阅者只会收到订阅后打开的页面的事件。
请注意,here .NET
已回答此问题,但使用 rxjs
中不可用的 ObservableCollection
。
下面是一些代码来说明问题:
class Page {
private _events = new Subject<Event>();
get events(): Observable<Event> {
return this._events.asObservable();
}
}
class Browser {
pages = [] as Page[];
private _ownEvents = new Subject<Event>();
addPage(page : Page) {
this.pages.push(page);
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
}
get oneObservable() {
//this won't work for aforementioned reasons
return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
}
}
它是用 TypeScript 编写的,但应该可以理解。
您将需要自己管理对页面的订阅并将其事件馈送到结果主题中:
const theOneObservable$ = new Subject<Event>();
function openPage(page: Page): Subscription {
return page.events$.subscribe(val => this.theOneObservable$.next(val));
}
关闭页面,即在返回的订阅上调用 unsubscribe
,将完成它必须做的所有事情。
Note that theOneObservable$
is a hot observable here.
当然,您可以更进一步,编写自己的可观察类型来封装所有这些 API。特别是,这将允许您在关闭时取消订阅所有内部可观察对象。
稍微不同的方法是:
const observables$ = new Subject<Observable<Event>>();
const theOneObservable$ = observables$.mergeMap(obs$ => obs$);
// Add a page's events; note that takeUntil takes care of the
// unsubscription process here.
observables$.next(page.events$.takeUntil(page.closed$));
这种方法的优势在于它会在取消订阅可观察对象时自动取消订阅内部可观察对象。
您可以 switchMap()
在链接到数组更改的 Subject()
上,在数组更改时用新的 Observable 替换。
pagesChanged = new Rx.Subject();
addPage(page : Page) {
this.pages.push(page);
this.pagesChanged.next();
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
this.pagesChanged.next();
}
get oneObservable() {
return pagesChanged
.switchMap(changeEvent =>
Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
)
}
测试中,
const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }
let pages = [];
const pagesChanged = new Rx.Subject();
const addPage = (page) => {
pages.push(page);
pagesChanged.next();
}
const removePage = (page) => {
let ixPage = pages.indexOf(page);
if (ixPage < 0) return;
pages.splice(ixPage, 1);
pagesChanged.next();
}
const _ownEvents = Rx.Observable.of('ownEvent')
const oneObservable =
pagesChanged
.switchMap(pp =>
Rx.Observable.from(pages)
.mergeMap(x => x.events)
.merge(_ownEvents)
)
oneObservable.subscribe(x => console.log('subscribe', x))
console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
我正在使用 rxjs
。
我有一个 Browser
负责许多 Page
对象。每个页面都有一个 Observable<Event>
产生事件流。
Page
对象在不同时间关闭和打开。我想创建一个名为 TheOneObservable
的可观察对象,它将合并来自所有当前活动的 Page
对象的所有事件,并合并来自 Browser
对象本身的自定义事件。
关闭 Page
意味着应该关闭对它的订阅,这样就不会阻止它被 GC。
我的问题是Pages
可以随时关闭,这意味着被合并的Observables的数量总是在变化。我想过使用 Pages
的 Observable 并使用 mergeMap
,但是这有问题。例如订阅者只会收到订阅后打开的页面的事件。
请注意,here .NET
已回答此问题,但使用 rxjs
中不可用的 ObservableCollection
。
下面是一些代码来说明问题:
class Page {
private _events = new Subject<Event>();
get events(): Observable<Event> {
return this._events.asObservable();
}
}
class Browser {
pages = [] as Page[];
private _ownEvents = new Subject<Event>();
addPage(page : Page) {
this.pages.push(page);
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
}
get oneObservable() {
//this won't work for aforementioned reasons
return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
}
}
它是用 TypeScript 编写的,但应该可以理解。
您将需要自己管理对页面的订阅并将其事件馈送到结果主题中:
const theOneObservable$ = new Subject<Event>();
function openPage(page: Page): Subscription {
return page.events$.subscribe(val => this.theOneObservable$.next(val));
}
关闭页面,即在返回的订阅上调用 unsubscribe
,将完成它必须做的所有事情。
Note that
theOneObservable$
is a hot observable here.
当然,您可以更进一步,编写自己的可观察类型来封装所有这些 API。特别是,这将允许您在关闭时取消订阅所有内部可观察对象。
稍微不同的方法是:
const observables$ = new Subject<Observable<Event>>();
const theOneObservable$ = observables$.mergeMap(obs$ => obs$);
// Add a page's events; note that takeUntil takes care of the
// unsubscription process here.
observables$.next(page.events$.takeUntil(page.closed$));
这种方法的优势在于它会在取消订阅可观察对象时自动取消订阅内部可观察对象。
您可以 switchMap()
在链接到数组更改的 Subject()
上,在数组更改时用新的 Observable 替换。
pagesChanged = new Rx.Subject();
addPage(page : Page) {
this.pages.push(page);
this.pagesChanged.next();
}
removePage(page : Page) {
let ixPage = this.pages.indexOf(page);
if (ixPage < 0) return;
this.pages.splice(ixPage, 1);
this.pagesChanged.next();
}
get oneObservable() {
return pagesChanged
.switchMap(changeEvent =>
Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
)
}
测试中,
const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }
let pages = [];
const pagesChanged = new Rx.Subject();
const addPage = (page) => {
pages.push(page);
pagesChanged.next();
}
const removePage = (page) => {
let ixPage = pages.indexOf(page);
if (ixPage < 0) return;
pages.splice(ixPage, 1);
pagesChanged.next();
}
const _ownEvents = Rx.Observable.of('ownEvent')
const oneObservable =
pagesChanged
.switchMap(pp =>
Rx.Observable.from(pages)
.mergeMap(x => x.events)
.merge(_ownEvents)
)
oneObservable.subscribe(x => console.log('subscribe', x))
console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>