RXJS 为套接字创建 Observable

RXJS Create Observable for sockets

我正在尝试 return 来自允许您收听任何 Pusher 频道的服务的 Observable。

工作正常,当 Observable(使用 createRealtimeObservable(...) 生成)被取消订阅时,我希望从推送器频道 unsubscribe

有什么想法吗?

  public realtimeObservable (channelName, eventName): Observable<any> {

        const realtimeObservable$ = new Observable((observer) => {

          const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
          channel.bind(eventName, (data) => {
            observer.next(data.payload)
          })

        })

        return realtimeObservable$
      }

当您自己手动创建一个 Observable 时,您有责任在取消订阅时清理所有资源。幸运的是 Observable 有一个机制允许你在创建时 return 一个 unsubscribe/dispose 函数:

return Observable.create((obs) => {
  const channel = this.pusher.subscribe(`private-organization-${this.authService.userProfile.organization_id}-${channelName}`)
  channel.bind(eventName, (data) => {
    observer.next(data.payload)
  });

  return () => {
    // unsubscribe event
    channel.unsubscribe(); // NOTE: i do not know the syntax for unsubscribing a Pusher channel, impl as required.
  };
});

注意:Observable.create()new Observable()

的语法糖