从不断变化的 Observables 列表中合并事件

Merge events from a changing list of Observables

我正在使用 rxjs

我有一个 Browser 负责许多 Page 对象。每个页面都有一个 Observable<Event> 产生事件流。

Page 对象在不同时间关闭和打开。我想创建一个名为 TheOneObservable 的可观察对象,它将合并来自所有当前活动的 Page 对象的所有事件,并合并来自 Browser 对象本身的自定义事件。

关闭 Page 意味着应该关闭对它的订阅,这样就不会阻止它被 GC。

我的问题是Pages可以随时关闭,这意味着被合并的Observables的数量总是在变化。我想过使用 Pages 的 Observable 并使用 mergeMap,但是这有问题。例如订阅者只会收到订阅后打开的页面的事件。


请注意,here .NET 已回答此问题,但使用 rxjs 中不可用的 ObservableCollection


下面是一些代码来说明问题:

class Page {
    private _events = new Subject<Event>();

    get events(): Observable<Event> {
        return this._events.asObservable();
    }
}

class Browser {
    pages = [] as Page[];
    private _ownEvents = new Subject<Event>();

    addPage(page : Page) {
        this.pages.push(page);
    }

    removePage(page : Page) {
        let ixPage = this.pages.indexOf(page);
        if (ixPage < 0) return;
        this.pages.splice(ixPage, 1);
    }

    get oneObservable() {
        //this won't work for aforementioned reasons
        return Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents);
    }
}

它是用 TypeScript 编写的,但应该可以理解。

您将需要自己管理对页面的订阅并将其事件馈送到结果主题中:

const theOneObservable$ = new Subject<Event>();

function openPage(page: Page): Subscription {
  return page.events$.subscribe(val => this.theOneObservable$.next(val));
}

关闭页面,即在返回的订阅上调用 unsubscribe,将完成它必须做的所有事情。

Note that theOneObservable$ is a hot observable here.

当然,您可以更进一步,编写自己的可观察类型来封装所有这些 API。特别是,这将允许您在关闭时取消订阅所有内部可观察对象。


稍微不同的方法是:

const observables$ = new Subject<Observable<Event>>();
const theOneObservable$ = observables$.mergeMap(obs$ => obs$);

// Add a page's events; note that takeUntil takes care of the
// unsubscription process here.
observables$.next(page.events$.takeUntil(page.closed$));

这种方法的优势在于它会在取消订阅可观察对象时自动取消订阅内部可观察对象。

您可以 switchMap() 在链接到数组更改的 Subject() 上,在数组更改时用新的 Observable 替换。

pagesChanged = new Rx.Subject();

addPage(page : Page) {
  this.pages.push(page);
  this.pagesChanged.next();
}

removePage(page : Page) {
  let ixPage = this.pages.indexOf(page);
  if (ixPage < 0) return;
  this.pages.splice(ixPage, 1);
  this.pagesChanged.next();
}

get oneObservable() {
  return pagesChanged
    .switchMap(changeEvent =>
      Observable.from(this.pages).mergeMap(x => x.events).merge(this._ownEvents)
    )
}

测试中,

const page1 = { events: Rx.Observable.of('page1Event') }
const page2 = { events: Rx.Observable.of('page2Event') }

let pages = []; 
const pagesChanged = new Rx.Subject();
const addPage = (page) => { 
  pages.push(page); 
  pagesChanged.next(); 
}
const removePage = (page) => { 
  let ixPage = pages.indexOf(page);
  if (ixPage < 0) return;
  pages.splice(ixPage, 1);
  pagesChanged.next(); 
}

const _ownEvents = Rx.Observable.of('ownEvent')

const oneObservable = 
  pagesChanged
    .switchMap(pp => 
      Rx.Observable.from(pages)
        .mergeMap(x => x.events)
        .merge(_ownEvents)
    )

oneObservable.subscribe(x => console.log('subscribe', x))

console.log('adding 1')
addPage(page1)
console.log('adding 2')
addPage(page2)
console.log('removing 1')
removePage(page1)
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>