RxJs:如何跨多个组件维护一个 WebSocket 连接?
RxJs: how to maintain one WebSocket connection across multiple components?
我有一个由多个组件组成的聊天应用程序(例如,一个组件包含群组列表,另一个组件包含消息等),当打开一个组件时,必须关闭另一个组件。
这是我用来提供 webSocket 连接的服务:
@Injectable()
export class ChatConnectionService {
private _connection: Subject<any>;
constructor(private window: WindowAbstract) { }
public closeConnection(): void {
this.connection.complete();
}
public send(msg: ChatRequest): void {
this.connection.next(msg);
}
public messages(): Subject<any> {
return this.connection;
}
private get connection(): Subject<any> {
if (!this._connection || this._connection.closed) {
this._connection = webSocket(`https/myapp/chat`);
}
return this._connection;
}
}
下面是 some.component.ts
中的用法示例:
private messages$: Subscription;
ngOnInit(): void {
this.messages$ = this.chatConnectionService.messages()
.subscribe(m => this.handleInboundMessage(m));
}
ngOnDestroy() {
this.messages$.unsubscribe();
}
现在的问题是,当我在组件之间切换时,一旦旧组件被销毁而取消订阅,新订阅似乎不起作用,它不发送或接收消息,也不会产生任何错误还有。
但是当我删除取消订阅行时它开始工作,但也继续使用旧订阅接收消息,这不是我真正想要的。
所以总结一下——我需要保持一个连接,但有可能让多个组件根据需要订阅和取消订阅,是否可以用 RxJs 来完成它?或者我做错了什么?任何建议将不胜感激,谢谢!
问题是您为组件提供了对从 webSocket('https/myapp/chat')
返回的 WebSocketSubject
的直接引用。如果您在 WebSocketSubject
上调用 .unsubscribe()
方法,它会关闭通道,这就是没有其他组件检测到消息的原因。
声明一个从私有 WebSocketSubject
.
管道传输的 public observable,而不是做 getter
@Injectable()
export class ChatConnectionService {
public messages$ = Observable<any>; // Replace 'any' with WS response type.
private connection: WebSocketSubject;
constructor(private window: WindowAbstract) {
this.connection = webSocket(`https/myapp/chat`);
this.messages$ = this.connection.asObservable();
}
}
您的组件现在将取消订阅 observable,而不是调用 WebSocketSubject 来取消订阅频道。
Update 在对 webSocket()
实例化的 WebSocketSubject 进行进一步审查后,我意识到 .asObservable()
不是您所处情况的最佳用例。即使您的组件取消订阅可观察对象,Subject 仍会从开放的 WS 通道接收新数据。
this.messages$ = this.connection.multiplex(
() => { 'send message to WS server to start' },
() => { 'send message to WS server to stop' },
(message) => true // filter messages by condition
);
您可以使用 multiplex()
方法从您的 WebSocketSubject 声明一个可观察对象。它需要三个参数:
- 为 WS 声明开始消息的函数。
- 为 WS 声明结束消息的函数。
- 过滤消息的功能
现在,每当任何组件订阅 messages$
时,它都会触发启动消息,为订阅者打开频道。当所有可观察对象取消订阅 messages$
时,它会发送关闭信号以停止任何进一步的消息。
这节省了大量用于维护 WS 的单例状态的样板文件。
我会把它留在这里给最终遇到同样情况的人。
您始终需要至少一个订阅 运行,这样当您取消订阅时它不会关闭连接。
这就是我所做的:
@NgModule({
imports: [
...
],
providers: [
{
provide: APP_INITIALIZER,
multi: true,
// subscribing establishes a connections
useFactory: (chat: ChatConnectionService) => () => chat.messages$.subscribe(),
deps: [ChatConnectionService]
}
]
})
export class AppModule {
}
这里不太明显的陷阱是取消订阅 WebSocketSubject
和通过订阅主题创建的 Observable
具有相同的效果 - 如果它是最后一次订阅。
我有一个由多个组件组成的聊天应用程序(例如,一个组件包含群组列表,另一个组件包含消息等),当打开一个组件时,必须关闭另一个组件。
这是我用来提供 webSocket 连接的服务:
@Injectable()
export class ChatConnectionService {
private _connection: Subject<any>;
constructor(private window: WindowAbstract) { }
public closeConnection(): void {
this.connection.complete();
}
public send(msg: ChatRequest): void {
this.connection.next(msg);
}
public messages(): Subject<any> {
return this.connection;
}
private get connection(): Subject<any> {
if (!this._connection || this._connection.closed) {
this._connection = webSocket(`https/myapp/chat`);
}
return this._connection;
}
}
下面是 some.component.ts
中的用法示例:
private messages$: Subscription;
ngOnInit(): void {
this.messages$ = this.chatConnectionService.messages()
.subscribe(m => this.handleInboundMessage(m));
}
ngOnDestroy() {
this.messages$.unsubscribe();
}
现在的问题是,当我在组件之间切换时,一旦旧组件被销毁而取消订阅,新订阅似乎不起作用,它不发送或接收消息,也不会产生任何错误还有。
但是当我删除取消订阅行时它开始工作,但也继续使用旧订阅接收消息,这不是我真正想要的。
所以总结一下——我需要保持一个连接,但有可能让多个组件根据需要订阅和取消订阅,是否可以用 RxJs 来完成它?或者我做错了什么?任何建议将不胜感激,谢谢!
问题是您为组件提供了对从 webSocket('https/myapp/chat')
返回的 WebSocketSubject
的直接引用。如果您在 WebSocketSubject
上调用 .unsubscribe()
方法,它会关闭通道,这就是没有其他组件检测到消息的原因。
声明一个从私有 WebSocketSubject
.
@Injectable()
export class ChatConnectionService {
public messages$ = Observable<any>; // Replace 'any' with WS response type.
private connection: WebSocketSubject;
constructor(private window: WindowAbstract) {
this.connection = webSocket(`https/myapp/chat`);
this.messages$ = this.connection.asObservable();
}
}
您的组件现在将取消订阅 observable,而不是调用 WebSocketSubject 来取消订阅频道。
Update 在对 webSocket()
实例化的 WebSocketSubject 进行进一步审查后,我意识到 .asObservable()
不是您所处情况的最佳用例。即使您的组件取消订阅可观察对象,Subject 仍会从开放的 WS 通道接收新数据。
this.messages$ = this.connection.multiplex(
() => { 'send message to WS server to start' },
() => { 'send message to WS server to stop' },
(message) => true // filter messages by condition
);
您可以使用 multiplex()
方法从您的 WebSocketSubject 声明一个可观察对象。它需要三个参数:
- 为 WS 声明开始消息的函数。
- 为 WS 声明结束消息的函数。
- 过滤消息的功能
现在,每当任何组件订阅 messages$
时,它都会触发启动消息,为订阅者打开频道。当所有可观察对象取消订阅 messages$
时,它会发送关闭信号以停止任何进一步的消息。
这节省了大量用于维护 WS 的单例状态的样板文件。
我会把它留在这里给最终遇到同样情况的人。
您始终需要至少一个订阅 运行,这样当您取消订阅时它不会关闭连接。
这就是我所做的:
@NgModule({
imports: [
...
],
providers: [
{
provide: APP_INITIALIZER,
multi: true,
// subscribing establishes a connections
useFactory: (chat: ChatConnectionService) => () => chat.messages$.subscribe(),
deps: [ChatConnectionService]
}
]
})
export class AppModule {
}
这里不太明显的陷阱是取消订阅 WebSocketSubject
和通过订阅主题创建的 Observable
具有相同的效果 - 如果它是最后一次订阅。