RxJS:使用 Websockets 和 Stomp 自动(断开)连接(取消)订阅

RxJS: Auto (dis)connect on (un)subscribe with Websockets and Stomp

我正在为 Stomp over Websockets 构建一个小的 RxJS 包装器,它已经可以工作了。

但现在我想到了一个非常酷的功能,可以(希望 - 如果我错了请纠正我)使用 RxJS 轻松完成。

当前行为:

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();      // onSuccess: set state to CONNECTED

// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
    .filter(state => state == "CONNECTED")
    .flatMap(myStompWrapper.subscribeDestination("/foo"))
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();    // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect();   // disconnects the stomp websocket connection

如您所见,我必须等待state == "CONNECTED"才能订阅subscribeDestination(..)。否则我会从 Stomp 库中得到一个错误。

新行为:

下一个实现应该会让用户更轻松。这是我的想象:

myStompWrapper.configure("/stomp_endpoint");

var subscription = myStompWrapper.subscribeDestination("/foo")
    .subscribe(msg => console.log(msg));

// ... and some time later:
subscription.unsubscribe();

它在内部应该如何工作:

  1. configure 只能在 DISCONNECTED
  2. 时调用
  3. 调用subscribeDestination时,有两种可能:
    1. if CONNECTED: 只需订阅目的地
    2. if DISCONNECTED:先调用connect(),再订阅目的地
  4. 调用unsubscribe时,有两种可能:
    1. 如果这是最后一次订阅:调用 disconnect()
    2. 如果这不是最后一次订阅:什么都不做

我还不确定如何到达那里,但这就是我在这里问这个问题的原因 ;-)

提前致谢!

编辑:更多代码、示例和解释

configure()not 断开连接时被调用时,它应该抛出一个 Error。但这没什么大不了的。

stompClient.connect(..) 是非阻塞的。它有一个 onSuccess 回调:

public connect() {
  stompClient.connect({}, this.onSuccess, this.errorHandler);
}

public onSuccess = () => {
  this.state.next(State.CONNECTED);
}

observeDestination(..) 订阅一个 Stomp 消息通道(= 目的地)和 returns 一个 Rx.Observable 然后可以用来取消订阅此 Stomp 消息频道:

public observeDestination(destination: string) {
  return this.state
      .filter(state => state == State.CONNECTED)
      .flatMap(_ => Rx.Observable.create(observer => {
        let stompSubscription = this.client.subscribe(
            destination,
            message => observer.next(message),
            {}
        );

        return () => {
          stompSubscription.unsubscribe();
        }
      }));
}

可以这样使用:

myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();

myStompWrapper.observeDestination("/foo")
    .subscribe(..);

myStompWrapper.observeDestination("/bar")
    .subscribe(..);

现在我想去掉 myStompWrapper.connect()。当第一个通过调用 observeDestination(..).subscribe(..) 进行订阅时,代码应该自动调用 this.connect() 并且当最后一个调用 unsubscribe().

时它应该调用 this.disconnect()

示例:

myStompWrapper.configure("/stomp_endpoint");

let subscription1 = myStompWrapper.observeDestination("/foo")
    .subscribe(..); // execute connect(), because this
                    // is the first subscription

let subscription2 = myStompWrapper.observeDestination("/bar")
    .subscribe(..);

subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this 
                             // was the last subscription

我同意您建议将代码藏到 myStompWrapper 中,在它的新家会更快乐。

我仍然建议使用 observeDestination 而不是 subscribeDestination("/foo") 这样的名称,因为您实际上并不是从该方法订阅,而只是完成您的可观察链。

  1. configure()只能在DISCONNECTED

    时调用

    您没有在此处指定如果它被调用而不是 DISCONNECTED 应该发生什么。由于您似乎没有 returning 此处要使用的任何值,因此我假设您打算在异常状态不便时抛出异常。为了跟踪此类状态,我将使用以 DISCONNECTED 的初始值开头的 BehaviourSubject。您可能希望将状态保持在 observeDestination 以内决定是否抛出异常

  2. 如果已连接:只需订阅目的地

    如果 DISCONNECTED:首先调用 connect(),然后订阅目的地

    正如我之前提到的,我认为如果订阅不在 subscribeDestination("/foo") 内发生,而是您只是构建您的可观察链,您会更开心。因为在某些情况下您只想调用 connect(),所以我会在包含状态条件的可观察链中简单地使用 .do() 调用。

  3. 要使用 rx-y 逻辑,您可能希望调用 disconnect() 作为您的可观察取消订阅的一部分,并简单地 return 一个共享的 refcounted 可观察开始.这样,每个新订阅者都不会重新创建新订阅,而是 .refCount() 将对可观察链进行一次订阅,一旦下游没有更多订阅者,unsubscribe()

假设消息以 this.observedData$ 的形式传入 myStompWrapper 我建议的代码作为 myStompWrapper 的一部分看起来像这样:

observeDestination() {
  return Rx.Observable.create(function (observer) {
     var subscription = this.getState()
             .filter(state => state == "CONNECTED")
             .do(state => state ? this.connect() : Observable.of(true))
             .switchMap(this.observedData$)
             .refCount();
             .subscribe(value => {
               try {
                 subscriber.next(someCallback(value));
               } catch(err) {
                 subscriber.error(err);
               }
             },
             err => subscriber.error(err),
             () => subscriber.complete());

 return { unsubscribe() { this.disconnect(); subscription.unsubscribe(); } };
}

因为我遗漏了您的一些代码,所以我允许自己不测试我的代码。但希望它能说明并呈现我在回答中提到的概念。