如何在不关闭底层网络套接字的情况下将 `firstValueFrom` 与 `WebSocketSubject` 一起使用?
How can I use `firstValueFrom` with `WebSocketSubject` without closing the underlying web socket?
我正在使用 WebSocketSubject
,我经常想在给定事件到达之前阻止执行,这就是我使用 firstValueFrom
的原因,如以下代码所示:
let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));
我只有一个问题,即 firstValueFrom
在解决承诺时调用 websocket.unsubscribe()
,但在 WebSocketSubject
上具有关闭底层 Web Socket 的效果,我想保持开放!
目前,我想到了几个可能的出路:
- 写一个不取消订阅的
firstValueFrom
的等价物。
反驳:除了一个小问题,我宁愿不重新实现一个近乎完美的功能;
- 使用另一个将订阅
WebSocketSubject
的 Subject
,我将在该主题上使用 firstValueFrom
。
反论点:在使用方面,我发现有两个 Subject
对象可能会造成混淆,并且必须知道使用哪一个(例如,使用 websocket.next
发送上游消息,只使用 websocketProxy
接收消息,永远不会 混淆两者!);
- 使用
multiplex
创建临时 Observable
对象,然后 firstValueFrom
可以毫无问题地关闭这些对象。
反论点:因为在这种情况下我实际上并没有多路复用,所以我宁愿不使用那个方法,它的签名和用法对我的用例来说似乎有点过分了。
简而言之,我怀疑我缺少一些基本的东西(例如适当的 OperatorFunction
),这些东西可以让我做到这一点,以便 firstValueFrom
进行的 unsubscribe
调用不会导致底层网络套接字被关闭。
本质上,您希望始终有一个订阅,以便套接字连接保持打开状态。我认为 firstValueFrom
不是完成这项工作的合适工具。我认为只创建一个显式订阅更简单。
如果打算在应用程序的整个生命周期内保持打开状态,只需在应用程序启动时订阅即可。
由于要过滤掉前几个发射直到满足某些条件,可以使用skipWhile
:
const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
websocket.subscribe(); // keep socket open
// listen
messages.subscribe(m => console.log('message received:', m);
// send
websocket.next('hello server');
可能值得在 rxjs websocket 周围创建一个轻型包装器 class 来处理保持连接打开并过滤掉前几个事件:
class MyWebsocket {
private websocket = new WebSocketSubject<any>(this.url);
public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
constructor(private url) {
this.websocket.subscribe(); // keep socket open
}
public sendMessage(message: any) {
this.websocket.sendMessage(message);
}
}
const websocket = new MyWebsocket(url);
// listen
websocket.messages.subscribe(m => console.log('message received:', m);
// send
websocket.sendMessage('hello server');
我正在使用 WebSocketSubject
,我经常想在给定事件到达之前阻止执行,这就是我使用 firstValueFrom
的原因,如以下代码所示:
let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));
我只有一个问题,即 firstValueFrom
在解决承诺时调用 websocket.unsubscribe()
,但在 WebSocketSubject
上具有关闭底层 Web Socket 的效果,我想保持开放!
目前,我想到了几个可能的出路:
- 写一个不取消订阅的
firstValueFrom
的等价物。
反驳:除了一个小问题,我宁愿不重新实现一个近乎完美的功能; - 使用另一个将订阅
WebSocketSubject
的Subject
,我将在该主题上使用firstValueFrom
。
反论点:在使用方面,我发现有两个Subject
对象可能会造成混淆,并且必须知道使用哪一个(例如,使用websocket.next
发送上游消息,只使用websocketProxy
接收消息,永远不会 混淆两者!); - 使用
multiplex
创建临时Observable
对象,然后firstValueFrom
可以毫无问题地关闭这些对象。
反论点:因为在这种情况下我实际上并没有多路复用,所以我宁愿不使用那个方法,它的签名和用法对我的用例来说似乎有点过分了。
简而言之,我怀疑我缺少一些基本的东西(例如适当的 OperatorFunction
),这些东西可以让我做到这一点,以便 firstValueFrom
进行的 unsubscribe
调用不会导致底层网络套接字被关闭。
本质上,您希望始终有一个订阅,以便套接字连接保持打开状态。我认为 firstValueFrom
不是完成这项工作的合适工具。我认为只创建一个显式订阅更简单。
如果打算在应用程序的整个生命周期内保持打开状态,只需在应用程序启动时订阅即可。
由于要过滤掉前几个发射直到满足某些条件,可以使用skipWhile
:
const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
websocket.subscribe(); // keep socket open
// listen
messages.subscribe(m => console.log('message received:', m);
// send
websocket.next('hello server');
可能值得在 rxjs websocket 周围创建一个轻型包装器 class 来处理保持连接打开并过滤掉前几个事件:
class MyWebsocket {
private websocket = new WebSocketSubject<any>(this.url);
public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));
constructor(private url) {
this.websocket.subscribe(); // keep socket open
}
public sendMessage(message: any) {
this.websocket.sendMessage(message);
}
}
const websocket = new MyWebsocket(url);
// listen
websocket.messages.subscribe(m => console.log('message received:', m);
// send
websocket.sendMessage('hello server');