检索 Observable 订阅者并让他们订阅另一个 Observable
Retrieve Observable subscribers and make them subscribe to another Observable
简单地说
给定一个现有的 Observable(尚未完成),有没有办法检索关联的订阅者(传递给 subscribe 的函数)以使他们订阅另一个 Observable?
上下文
我的应用程序中的一项服务有助于创建 SeverEvent 连接,将 ConnectableObservable 返回到代理连接并允许使用 进行多播publish 运算符。该服务通过内部存储跟踪现有连接:
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
创建连接后,如果关联的跟踪器已经存在(身份是使用连接的端点创建的),则服务应该:
- 确定
关闭现有跟踪器的 ServerEvent 连接
- ok
打开一个新的 SerevrEvent 连接(因此一个新的 ConnectableObservable)
- 将现有跟踪器的 Observable 替换为新的 Observable 但让现有订阅者现在订阅新的 Observable
这是创建 ConnectionTrackers
的代码部分
/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
谢谢。
与其尝试手动将订阅者从一个 Observable 转移到另一个,不如为侦听器提供一个 Observable,它会在需要时自动切换到另一个 Observable。
你通过使用一个高阶 Observable(一个发出 Observable 的 Observable)来做到这一点,它总是切换到最近的内部 Observable。
基本概念
// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);
// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));
// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })
你的情况
为每个 端点 创建一个 BehaviorSubject
(tracker supplier) 发出 Observable (tracker ) 当前应该用于该 端点 。当应该为给定的 端点 使用不同的 tracker 时,将这个新的 Observable 传递给 BehaviorSubject
。让您的听众订阅 BehaviorSubject
(tracker supplier),自动为他们提供正确的 tracker,即切换到 Observable当前应该使用。
您的代码的简化版本可能如下所示。具体取决于您如何在整个应用程序中使用函数 createTracker
。
interface ConnectionTracker {
fullUri: string;
tracker$: ConnectableObservable<any>;
}
// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();
// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker.
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
const fullUri = endpoint + (queryString ? `?${queryString}` : '');
// if no tracker supplier for the endpoint exists, create one
if (!store[endpoint]) {
store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
}
const currentTracker = store[endpoint].getValue();
// if no tracker exists or the current one is obsolete, create a new one
if (!currentTracker || currentTracker.fullUri !== fullUri) {
const tracker$ = new Observable<T>(subscriber => {
const source = new EventSource(fullUri, { withCredentials: true });
source.onmessage = e => subscriber.next(JSON.parse(e.data));
source.onerror = e => subscriber.error(e);
return () => source.close(); // on unsubscribe close the source
}).pipe(publish()) as ConnectableObservable<any>;
tracker$.connect();
// pass the new tracker to the tracker supplier
store[endpoint].next({ fullUri, tracker$ });
}
// return the tracker supplier for the given endpoint that always switches to the current tracker
return store[endpoint].pipe(
switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
);
}
// close all trackers and remove the tracker suppliers
closeAllTrackers() {
this.closeAllTrackers$.next();
this.store = {};
}
如果您想立即关闭所有跟踪器连接并且现有订阅者应该收到 complete
通知,请致电 closeAllTrackers
。
如果您只想关闭一些跟踪器连接但不希望现有订阅者收到 complete
通知,以便他们继续监听将来提供的新跟踪器,请为每个跟踪器调用 store[trackerEndpoint].next(null)
。
如果您尝试做一些事情,例如将订阅者移动到不同的可观察对象,那么您只是没有按照 RxJS 中的预期去做。任何此类操纵基本上都是黑客行为。
如果您偶尔会产生一个新的可观察对象(例如通过发出请求)并且您希望某些订阅者始终订阅最新的观察对象,那么解决方案如下:
private observables: Subject<Observable<Data>> = new Subject();
getData(): Observable<Data> {
return this.observables.pipe(switchAll());
}
onMakingNewRequest(newObservable: Observable<Data>) {
this.observables.push(newObservable);
}
通过这种方式,您可以公开客户端订阅的单个可观察对象(通过 getData()
),但通过推送到 this.observables
,您可以更改用户看到的实际数据源。
至于关闭连接和类似的东西,你的可观察对象(根据每个请求或其他东西创建的)基本上应该负责在取消订阅时释放和关闭东西,然后你不需要做任何额外的事情处理,从您推送新的那一刻起,以前的可观察到的将自动取消订阅。详情取决于您联系的实际后端。
简单地说
给定一个现有的 Observable(尚未完成),有没有办法检索关联的订阅者(传递给 subscribe 的函数)以使他们订阅另一个 Observable?
上下文
我的应用程序中的一项服务有助于创建 SeverEvent 连接,将 ConnectableObservable 返回到代理连接并允许使用 进行多播publish 运算符。该服务通过内部存储跟踪现有连接:
store: {[key: string]: ConnectionTracker};
// …
interface ConnectionTracker {
url: string;
eventSource: EventSource;
observable: rx.ConnectableObservable<any>;
subscription: rx.Subscription;
observer: rx.Observer<any>;
data?: any; // Arbitrary data
}
创建连接后,如果关联的跟踪器已经存在(身份是使用连接的端点创建的),则服务应该:
- 确定
关闭现有跟踪器的 ServerEvent 连接 - ok
打开一个新的 SerevrEvent 连接(因此一个新的 ConnectableObservable) - 将现有跟踪器的 Observable 替换为新的 Observable 但让现有订阅者现在订阅新的 Observable
这是创建 ConnectionTrackers
的代码部分/**
* Create/Update a ServerEvent connection tracker
*/
createTracker<T>(endpoint: string, queryString: string = null): ConnectionTracker
{
let fullUri = endpoint + (queryString ? `?${queryString}` : '')
, tracker = this.findTrackerByEndpoint(endpoint) || {
observable: null,
fullUri: fullUri,
eventSource: null,
observer: null,
subscription: null
}
;
// Tracker exists
if (tracker.observable !== null) {
// If fullUri hasn't changed, use the tracker as is
if (tracker.fullUri === fullUri) {
return tracker;
}
// At this point, we know "fullUri" has changed, the tracker's
// connection should be replaced with a fresh one
// ⇒ TODO
// ⇒ Gather old tracker.observable's subscribers/subscriptions to make
// them subscribe to the new Observable instead (created down below)
// Terminate previous connection and clean related resouces
tracker.observer.complete();
tracker.eventSource.close();
}
tracker.eventSource = new EventSource(<any>fullUri, {withCredentials: true});
tracker.observable = rx.Observable.create((observer: rx.Observer<T>) => {
// Executed once
tracker.eventSource.onmessage = e => observer.next(JSON.parse(e.data));
tracker.eventSource.onerror = e => observer.error(e);
// Keep track of the observer
tracker.observer = observer;
})
// Transform Observable into a ConnectableObservable for multicast
.publish()
;
// Start emitting right away and also keep a reference to
// proxy subscription for later disposal
tracker.subscription = tracker.observable.connect();
return tracker;
}
谢谢。
与其尝试手动将订阅者从一个 Observable 转移到另一个,不如为侦听器提供一个 Observable,它会在需要时自动切换到另一个 Observable。
你通过使用一个高阶 Observable(一个发出 Observable 的 Observable)来做到这一点,它总是切换到最近的内部 Observable。
基本概念
// a BehaviorSubject is used so that late subscribers also immediately get the most recent inner Observable
const higherOrderObservable = new BehaviorSubject<Observable<any>>(EMPTY);
// pass new Observable to listeners
higherOrderObservable.next(new Observable(..));
// get most recent inner Observable
const currentObservable = higherOrderObservable.pipe(switchMap(obs => obs));
currentObservable.subscribe(valueFromInnerObservable => { .. })
你的情况
为每个 端点 创建一个 BehaviorSubject
(tracker supplier) 发出 Observable (tracker ) 当前应该用于该 端点 。当应该为给定的 端点 使用不同的 tracker 时,将这个新的 Observable 传递给 BehaviorSubject
。让您的听众订阅 BehaviorSubject
(tracker supplier),自动为他们提供正确的 tracker,即切换到 Observable当前应该使用。
您的代码的简化版本可能如下所示。具体取决于您如何在整个应用程序中使用函数 createTracker
。
interface ConnectionTracker {
fullUri: string;
tracker$: ConnectableObservable<any>;
}
// Map an endpoint to a tracker supplier.
// This is your higher order Observable as it emits objects that wrap an Observable
store: { [key: string]: BehaviorSubject<ConnectionTracker> };
closeAllTrackers$ = new Subject();
// Creates a new tracker if necessary and returns a ConnectedObservable for that tracker.
// The ConnectedObservable will always resemble the current tracker.
createTracker<T>(endpoint: string, queryString: string = null): Observable<any> {
const fullUri = endpoint + (queryString ? `?${queryString}` : '');
// if no tracker supplier for the endpoint exists, create one
if (!store[endpoint]) {
store[endpoint] = new BehaviorSubject<ConnectionTracker>(null);
}
const currentTracker = store[endpoint].getValue();
// if no tracker exists or the current one is obsolete, create a new one
if (!currentTracker || currentTracker.fullUri !== fullUri) {
const tracker$ = new Observable<T>(subscriber => {
const source = new EventSource(fullUri, { withCredentials: true });
source.onmessage = e => subscriber.next(JSON.parse(e.data));
source.onerror = e => subscriber.error(e);
return () => source.close(); // on unsubscribe close the source
}).pipe(publish()) as ConnectableObservable<any>;
tracker$.connect();
// pass the new tracker to the tracker supplier
store[endpoint].next({ fullUri, tracker$ });
}
// return the tracker supplier for the given endpoint that always switches to the current tracker
return store[endpoint].pipe(
switchMap(tracker => tracker ? tracker.tracker$ : EMPTY), // switchMap will unsubscribe from the previous tracker and thus close the connection if a new tracker comes in
takeUntil(this.closeAllTrackers$) // complete the tracker supplier on emit
);
}
// close all trackers and remove the tracker suppliers
closeAllTrackers() {
this.closeAllTrackers$.next();
this.store = {};
}
如果您想立即关闭所有跟踪器连接并且现有订阅者应该收到 complete
通知,请致电 closeAllTrackers
。
如果您只想关闭一些跟踪器连接但不希望现有订阅者收到 complete
通知,以便他们继续监听将来提供的新跟踪器,请为每个跟踪器调用 store[trackerEndpoint].next(null)
。
如果您尝试做一些事情,例如将订阅者移动到不同的可观察对象,那么您只是没有按照 RxJS 中的预期去做。任何此类操纵基本上都是黑客行为。
如果您偶尔会产生一个新的可观察对象(例如通过发出请求)并且您希望某些订阅者始终订阅最新的观察对象,那么解决方案如下:
private observables: Subject<Observable<Data>> = new Subject();
getData(): Observable<Data> {
return this.observables.pipe(switchAll());
}
onMakingNewRequest(newObservable: Observable<Data>) {
this.observables.push(newObservable);
}
通过这种方式,您可以公开客户端订阅的单个可观察对象(通过 getData()
),但通过推送到 this.observables
,您可以更改用户看到的实际数据源。
至于关闭连接和类似的东西,你的可观察对象(根据每个请求或其他东西创建的)基本上应该负责在取消订阅时释放和关闭东西,然后你不需要做任何额外的事情处理,从您推送新的那一刻起,以前的可观察到的将自动取消订阅。详情取决于您联系的实际后端。