如何在不关闭底层网络套接字的情况下将 `firstValueFrom` 与 `WebSocketSubject` 一起使用?

How can I use `firstValueFrom` with `WebSocketSubject` without closing the underlying web socket?

我正在使用 WebSocketSubject,我经常想在给定事件到达之前阻止执行,这就是我使用 firstValueFrom 的原因,如以下代码所示:

let websocket = new WebSocketSubject<any>(url);
let firstMessage = await firstValueFrom(websocket.pipe(filter(m => true));

我只有一个问题,即 firstValueFrom 在解决承诺时调用 websocket.unsubscribe(),但在 WebSocketSubject 上具有关闭底层 Web Socket 的效果,我想保持开放!

目前,我想到了几个可能的出路:

简而言之,我怀疑我缺少一些基本的东西(例如适当的 OperatorFunction),这些东西可以让我做到这一点,以便 firstValueFrom 进行的 unsubscribe 调用不会导致底层网络套接字被关闭。

本质上,您希望始终有一个订阅,以便套接字连接保持打开状态。我认为 firstValueFrom 不是完成这项工作的合适工具。我认为只创建一个显式订阅更简单。

如果打算在应用程序的整个生命周期内保持打开状态,只需在应用程序启动时订阅即可。

由于要过滤掉前几个发射直到满足某些条件,可以使用skipWhile:

const websocket = new WebSocketSubject<any>(url);
const messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

websocket.subscribe(); // keep socket open


// listen
messages.subscribe(m => console.log('message received:', m);

// send
websocket.next('hello server');

可能值得在 rxjs websocket 周围创建一个轻型包装器 class 来处理保持连接打开并过滤掉前几个事件:

class MyWebsocket {
  private websocket = new WebSocketSubject<any>(this.url);
  public messages = websocket.pipe(skipWhile(m => m !== 'my special event'));

  constructor(private url) {
    this.websocket.subscribe(); // keep socket open
  }
 
  public sendMessage(message: any) {
    this.websocket.sendMessage(message);
  }
}
const websocket = new MyWebsocket(url);

// listen
websocket.messages.subscribe(m => console.log('message received:', m);

// send
websocket.sendMessage('hello server');