RxJS:使用 Websockets 和 Stomp 自动(断开)连接(取消)订阅
RxJS: Auto (dis)connect on (un)subscribe with Websockets and Stomp
我正在为 Stomp over Websockets 构建一个小的 RxJS 包装器,它已经可以工作了。
但现在我想到了一个非常酷的功能,可以(希望 - 如果我错了请纠正我)使用 RxJS 轻松完成。
当前行为:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect(); // onSuccess: set state to CONNECTED
// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
.filter(state => state == "CONNECTED")
.flatMap(myStompWrapper.subscribeDestination("/foo"))
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe(); // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect(); // disconnects the stomp websocket connection
如您所见,我必须等待state == "CONNECTED"
才能订阅subscribeDestination(..)
。否则我会从 Stomp 库中得到一个错误。
新行为:
下一个实现应该会让用户更轻松。这是我的想象:
myStompWrapper.configure("/stomp_endpoint");
var subscription = myStompWrapper.subscribeDestination("/foo")
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe();
它在内部应该如何工作:
configure
只能在 DISCONNECTED
时调用
- 调用
subscribeDestination
时,有两种可能:
- if
CONNECTED
: 只需订阅目的地
- if
DISCONNECTED
:先调用connect()
,再订阅目的地
- 调用
unsubscribe
时,有两种可能:
- 如果这是最后一次订阅:调用
disconnect()
- 如果这不是最后一次订阅:什么都不做
我还不确定如何到达那里,但这就是我在这里问这个问题的原因 ;-)
提前致谢!
编辑:更多代码、示例和解释
当 configure() 在 not 断开连接时被调用时,它应该抛出一个 Error
。但这没什么大不了的。
stompClient.connect(..) 是非阻塞的。它有一个 onSuccess
回调:
public connect() {
stompClient.connect({}, this.onSuccess, this.errorHandler);
}
public onSuccess = () => {
this.state.next(State.CONNECTED);
}
observeDestination(..) 订阅一个 Stomp 消息通道(= 目的地)和 returns 一个 Rx.Observable
然后可以用来取消订阅此 Stomp 消息频道:
public observeDestination(destination: string) {
return this.state
.filter(state => state == State.CONNECTED)
.flatMap(_ => Rx.Observable.create(observer => {
let stompSubscription = this.client.subscribe(
destination,
message => observer.next(message),
{}
);
return () => {
stompSubscription.unsubscribe();
}
}));
}
可以这样使用:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();
myStompWrapper.observeDestination("/foo")
.subscribe(..);
myStompWrapper.observeDestination("/bar")
.subscribe(..);
现在我想去掉 myStompWrapper.connect()
。当第一个通过调用 observeDestination(..).subscribe(..)
进行订阅时,代码应该自动调用 this.connect()
并且当最后一个调用 unsubscribe()
.
时它应该调用 this.disconnect()
示例:
myStompWrapper.configure("/stomp_endpoint");
let subscription1 = myStompWrapper.observeDestination("/foo")
.subscribe(..); // execute connect(), because this
// is the first subscription
let subscription2 = myStompWrapper.observeDestination("/bar")
.subscribe(..);
subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this
// was the last subscription
我同意您建议将代码藏到 myStompWrapper 中,在它的新家会更快乐。
我仍然建议使用 observeDestination
而不是 subscribeDestination("/foo")
这样的名称,因为您实际上并不是从该方法订阅,而只是完成您的可观察链。
configure()
只能在DISCONNECTED
时调用
您没有在此处指定如果它被调用而不是 DISCONNECTED
应该发生什么。由于您似乎没有 returning 此处要使用的任何值,因此我假设您打算在异常状态不便时抛出异常。为了跟踪此类状态,我将使用以 DISCONNECTED
的初始值开头的 BehaviourSubject
。您可能希望将状态保持在 observeDestination
以内决定是否抛出异常
如果已连接:只需订阅目的地
如果 DISCONNECTED:首先调用 connect(),然后订阅目的地
正如我之前提到的,我认为如果订阅不在 subscribeDestination("/foo")
内发生,而是您只是构建您的可观察链,您会更开心。因为在某些情况下您只想调用 connect()
,所以我会在包含状态条件的可观察链中简单地使用 .do()
调用。
要使用 rx-y 逻辑,您可能希望调用 disconnect()
作为您的可观察取消订阅的一部分,并简单地 return 一个共享的 refcounted 可观察开始.这样,每个新订阅者都不会重新创建新订阅,而是 .refCount()
将对可观察链进行一次订阅,一旦下游没有更多订阅者,unsubscribe()
。
假设消息以 this.observedData$ 的形式传入 myStompWrapper
我建议的代码作为 myStompWrapper
的一部分看起来像这样:
observeDestination() {
return Rx.Observable.create(function (observer) {
var subscription = this.getState()
.filter(state => state == "CONNECTED")
.do(state => state ? this.connect() : Observable.of(true))
.switchMap(this.observedData$)
.refCount();
.subscribe(value => {
try {
subscriber.next(someCallback(value));
} catch(err) {
subscriber.error(err);
}
},
err => subscriber.error(err),
() => subscriber.complete());
return { unsubscribe() { this.disconnect(); subscription.unsubscribe(); } };
}
因为我遗漏了您的一些代码,所以我允许自己不测试我的代码。但希望它能说明并呈现我在回答中提到的概念。
我正在为 Stomp over Websockets 构建一个小的 RxJS 包装器,它已经可以工作了。
但现在我想到了一个非常酷的功能,可以(希望 - 如果我错了请纠正我)使用 RxJS 轻松完成。
当前行为:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect(); // onSuccess: set state to CONNECTED
// state (Observable) can be DISCONNECTED or CONNECTED
var subscription = myStompWrapper.getState()
.filter(state => state == "CONNECTED")
.flatMap(myStompWrapper.subscribeDestination("/foo"))
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe(); // calls 'unsubscribe' for this stomp destination
myStompWrapper.disconnect(); // disconnects the stomp websocket connection
如您所见,我必须等待state == "CONNECTED"
才能订阅subscribeDestination(..)
。否则我会从 Stomp 库中得到一个错误。
新行为:
下一个实现应该会让用户更轻松。这是我的想象:
myStompWrapper.configure("/stomp_endpoint");
var subscription = myStompWrapper.subscribeDestination("/foo")
.subscribe(msg => console.log(msg));
// ... and some time later:
subscription.unsubscribe();
它在内部应该如何工作:
configure
只能在DISCONNECTED
时调用
- 调用
subscribeDestination
时,有两种可能:- if
CONNECTED
: 只需订阅目的地 - if
DISCONNECTED
:先调用connect()
,再订阅目的地
- if
- 调用
unsubscribe
时,有两种可能:- 如果这是最后一次订阅:调用
disconnect()
- 如果这不是最后一次订阅:什么都不做
- 如果这是最后一次订阅:调用
我还不确定如何到达那里,但这就是我在这里问这个问题的原因 ;-)
提前致谢!
编辑:更多代码、示例和解释
当 configure() 在 not 断开连接时被调用时,它应该抛出一个 Error
。但这没什么大不了的。
stompClient.connect(..) 是非阻塞的。它有一个 onSuccess
回调:
public connect() {
stompClient.connect({}, this.onSuccess, this.errorHandler);
}
public onSuccess = () => {
this.state.next(State.CONNECTED);
}
observeDestination(..) 订阅一个 Stomp 消息通道(= 目的地)和 returns 一个 Rx.Observable
然后可以用来取消订阅此 Stomp 消息频道:
public observeDestination(destination: string) {
return this.state
.filter(state => state == State.CONNECTED)
.flatMap(_ => Rx.Observable.create(observer => {
let stompSubscription = this.client.subscribe(
destination,
message => observer.next(message),
{}
);
return () => {
stompSubscription.unsubscribe();
}
}));
}
可以这样使用:
myStompWrapper.configure("/stomp_endpoint");
myStompWrapper.connect();
myStompWrapper.observeDestination("/foo")
.subscribe(..);
myStompWrapper.observeDestination("/bar")
.subscribe(..);
现在我想去掉 myStompWrapper.connect()
。当第一个通过调用 observeDestination(..).subscribe(..)
进行订阅时,代码应该自动调用 this.connect()
并且当最后一个调用 unsubscribe()
.
this.disconnect()
示例:
myStompWrapper.configure("/stomp_endpoint");
let subscription1 = myStompWrapper.observeDestination("/foo")
.subscribe(..); // execute connect(), because this
// is the first subscription
let subscription2 = myStompWrapper.observeDestination("/bar")
.subscribe(..);
subscription2.unsubscribe();
subscription1.unsubscribe(); // execute disconnect(), because this
// was the last subscription
我同意您建议将代码藏到 myStompWrapper 中,在它的新家会更快乐。
我仍然建议使用 observeDestination
而不是 subscribeDestination("/foo")
这样的名称,因为您实际上并不是从该方法订阅,而只是完成您的可观察链。
时调用configure()
只能在DISCONNECTED
您没有在此处指定如果它被调用而不是
DISCONNECTED
应该发生什么。由于您似乎没有 returning 此处要使用的任何值,因此我假设您打算在异常状态不便时抛出异常。为了跟踪此类状态,我将使用以DISCONNECTED
的初始值开头的BehaviourSubject
。您可能希望将状态保持在observeDestination
以内决定是否抛出异常如果已连接:只需订阅目的地
如果 DISCONNECTED:首先调用 connect(),然后订阅目的地
正如我之前提到的,我认为如果订阅不在
subscribeDestination("/foo")
内发生,而是您只是构建您的可观察链,您会更开心。因为在某些情况下您只想调用connect()
,所以我会在包含状态条件的可观察链中简单地使用.do()
调用。要使用 rx-y 逻辑,您可能希望调用
disconnect()
作为您的可观察取消订阅的一部分,并简单地 return 一个共享的 refcounted 可观察开始.这样,每个新订阅者都不会重新创建新订阅,而是.refCount()
将对可观察链进行一次订阅,一旦下游没有更多订阅者,unsubscribe()
。
假设消息以 this.observedData$ 的形式传入 myStompWrapper
我建议的代码作为 myStompWrapper
的一部分看起来像这样:
observeDestination() {
return Rx.Observable.create(function (observer) {
var subscription = this.getState()
.filter(state => state == "CONNECTED")
.do(state => state ? this.connect() : Observable.of(true))
.switchMap(this.observedData$)
.refCount();
.subscribe(value => {
try {
subscriber.next(someCallback(value));
} catch(err) {
subscriber.error(err);
}
},
err => subscriber.error(err),
() => subscriber.complete());
return { unsubscribe() { this.disconnect(); subscription.unsubscribe(); } };
}
因为我遗漏了您的一些代码,所以我允许自己不测试我的代码。但希望它能说明并呈现我在回答中提到的概念。