RXJS 为套接字创建 Observable
RXJS Create Observable for sockets
我正在尝试 return 来自允许您收听任何 Pusher 频道的服务的 Observable。
工作正常,当 Observable(使用 createRealtimeObservable(...)
生成)被取消订阅时,我希望从推送器频道 unsubscribe
。
有什么想法吗?
public realtimeObservable (channelName, eventName): Observable<any> {
const realtimeObservable$ = new Observable((observer) => {
const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
channel.bind(eventName, (data) => {
observer.next(data.payload)
})
})
return realtimeObservable$
}
当您自己手动创建一个 Observable 时,您有责任在取消订阅时清理所有资源。幸运的是 Observable 有一个机制允许你在创建时 return 一个 unsubscribe/dispose 函数:
return Observable.create((obs) => {
const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
channel.bind(eventName, (data) => {
observer.next(data.payload)
});
return () => {
// unsubscribe event
channel.unsubscribe(); // NOTE: i do not know the syntax for unsubscribing a Pusher channel, impl as required.
};
});
注意:Observable.create()
是 new Observable()
的语法糖
我正在尝试 return 来自允许您收听任何 Pusher 频道的服务的 Observable。
工作正常,当 Observable(使用 createRealtimeObservable(...)
生成)被取消订阅时,我希望从推送器频道 unsubscribe
。
有什么想法吗?
public realtimeObservable (channelName, eventName): Observable<any> {
const realtimeObservable$ = new Observable((observer) => {
const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
channel.bind(eventName, (data) => {
observer.next(data.payload)
})
})
return realtimeObservable$
}
当您自己手动创建一个 Observable 时,您有责任在取消订阅时清理所有资源。幸运的是 Observable 有一个机制允许你在创建时 return 一个 unsubscribe/dispose 函数:
return Observable.create((obs) => {
const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
channel.bind(eventName, (data) => {
observer.next(data.payload)
});
return () => {
// unsubscribe event
channel.unsubscribe(); // NOTE: i do not know the syntax for unsubscribing a Pusher channel, impl as required.
};
});
注意:Observable.create()
是 new Observable()