RxJs:如何跨多个组件维护一个 WebSocket 连接?

RxJs: how to maintain one WebSocket connection across multiple components?

我有一个由多个组件组成的聊天应用程序(例如,一个组件包含群组列表,另一个组件包含消息等),当打开一个组件时,必须关闭另一个组件。

这是我用来提供 webSocket 连接的服务:

@Injectable()
export class ChatConnectionService {

  private _connection: Subject<any>;

  constructor(private window: WindowAbstract) { }

  public closeConnection(): void {
    this.connection.complete();
  }

  public send(msg: ChatRequest): void {
    this.connection.next(msg);
  }

  public messages(): Subject<any> {
    return this.connection;
  }

  private get connection(): Subject<any> {
    if (!this._connection || this._connection.closed) {
      this._connection = webSocket(`https/myapp/chat`);
    }

    return this._connection;
  }
}

下面是 some.component.ts 中的用法示例:

  private messages$: Subscription;

  ngOnInit(): void {
    this.messages$ = this.chatConnectionService.messages()
      .subscribe(m => this.handleInboundMessage(m));
  }

  ngOnDestroy() {
    this.messages$.unsubscribe();
  }

现在的问题是,当我在组件之间切换时,一旦旧组件被销毁而取消订阅,新订阅似乎不起作用,它不发送或接收消息,也不会产生任何错误还有。

但是当我删除取消订阅行时它开始工作,但也继续使用旧订阅接收消息,这不是我真正想要的。

所以总结一下——我需要保持一个连接,但有可能让多个组件根据需要订阅和取消订阅,是否可以用 RxJs 来完成它?或者我做错了什么?任何建议将不胜感激,谢谢!

问题是您为组件提供了对从 webSocket('https/myapp/chat') 返回的 WebSocketSubject 的直接引用。如果您在 WebSocketSubject 上调用 .unsubscribe() 方法,它会关闭通道,这就是没有其他组件检测到消息的原因。

声明一个从私有 WebSocketSubject.

管道传输的 public observable,而不是做 getter
@Injectable()
export class ChatConnectionService {

  public messages$ = Observable<any>; // Replace 'any' with WS response type.
  private connection: WebSocketSubject;

  constructor(private window: WindowAbstract) { 
    this.connection = webSocket(`https/myapp/chat`);
    this.messages$ = this.connection.asObservable();
  }

}

您的组件现在将取消订阅 observable,而不是调用 WebSocketSubject 来取消订阅频道。


Update 在对 webSocket() 实例化的 WebSocketSubject 进行进一步审查后,我意识到 .asObservable() 不是您所处情况的最佳用例。即使您的组件取消订阅可观察对象,Subject 仍会从开放的 WS 通道接收新数据。

this.messages$ = this.connection.multiplex(
  () => { 'send message to WS server to start' }, 
  () => { 'send message to WS server to stop' }, 
  (message) => true // filter messages by condition
);

您可以使用 multiplex() 方法从您的 WebSocketSubject 声明一个可观察对象。它需要三个参数:

  1. 为 WS 声明开始消息的函数。
  2. 为 WS 声明结束消息的函数。
  3. 过滤消息的功能

现在,每当任何组件订阅 messages$ 时,它都会触发启动消息,为订阅者打开频道。当所有可观察对象取消订阅 messages$ 时,它会发送关闭信号以停止任何进一步的消息。

这节省了大量用于维护 WS 的单例状态的样板文件。

参考:https://rxjs.dev/api/webSocket/WebSocketSubject

我会把它留在这里给最终遇到同样情况的人。

您始终需要至少一个订阅 运行,这样当您取消订阅时它不会关闭连接。

这就是我所做的:

@NgModule({
  imports: [
     ...
  ],
  providers: [
    {
        provide: APP_INITIALIZER,
        multi: true,
        // subscribing establishes a connections
        useFactory: (chat: ChatConnectionService) => () => chat.messages$.subscribe(),
        deps: [ChatConnectionService]
    }
  ]
})

export class AppModule {
}

这里不太明显的陷阱是取消订阅 WebSocketSubject 和通过订阅主题创建的 Observable 具有相同的效果 - 如果它是最后一次订阅。