如何从 pubnub 订阅创建一个可观察对象
How to create an observable from pubnub subscribe
我正在为如何使用 rxjs 库将以下内容转换为可观察对象而苦苦挣扎。
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
client.subscribe({
channel: 'admin:posts',
message: function(message, env, channel){console.log("Message");},
connect: function(){console.log("Connected");},
disconnect: function(){console.log("Disconnected");},
reconnect: function(){console.log("Reconnected");},
error: function(){console.log("Network Error");},
});
我希望将消息回调与其他回调一起转换为可观察对象。
关于如何做到这一点有什么想法吗?
谢谢
更新 - 2015 年 4 月 29 日
这是我最终用来实现此功能的工具。我应该补充一点,只有在用户登录并清理注销后,我才需要订阅 pubnub。
请让我知道这是否是一个好方法:
var self = this;
var logins = Rx.Observable.create(function (obs) {
//Using a session manager in ember.
self.get('session').on('sessionAuthenticationSucceeded', function(e){
var data = {
token:this.content.secure.token,
email:this.content.secure.email
}
obs.onNext(data);
});
self.get('session').on('sessionAuthenticationFailed', function(e){obs.onError(e)});
return function(){
self.get('session').off('sessionAuthenticationSucceeded', function(e){obs.onNext(e)});
self.get('session').off('sessionAuthenticationFailed', function(e){obs.onError(e)});
}
});
var logouts = Rx.Observable.create(function (obs) {
self.get('session').on('sessionInvalidationSucceeded', function(){obs.onNext()});
self.get('session').on('sessionInvalidationFailed', function(e){obs.onError(e)});
return function(){
self.get('session').off('sessionInvalidationSucceeded', function(e){obs.onNext(e)});
self.get('session').off('sessionInvalidationFailed', function(e){obs.onError(e)});
}
});
var dataStream = logins
.map(function(credentials){
return PUBNUB.init({
publish_key: 'pub_key',
subscribe_key: 'sub_key',
auth_key: credentials.token,
uuid: credentials.email
});
})
.scan(function(prev, current){
prev.unsubscribe({
channel_group:'admin:data'
});
return current;
})
.concatMap(function(client){
return Rx.Observable.create(function (observer) {
client.subscribe({
channel_group:'admin:data',
message:function(message, env, channel){
observer.onNext({message:message, env:env, channel:channel, error:null});
},
error:function(error){
observer.onNext({error:error})
}
});
return function(){
client.unsubscribe({channel_group:'admin:data'});
}
});
})
.takeUntil(logouts)
.publish()
.refCount();
var sub1 = dataStream.subscribe(function(data){
console.log('sub1', data);
});
var sub2 = dataStream.subscribe(function(data){
console.log('sub2', data);
});
当然,您可以像这样使用 .create
创建源:
// your client
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
var source = Rx.Observable.create(function (observer) {
client.subscribe({
channel: 'admin:posts',
message: function(message){ observer.onNext(message); }, // next item
error: function(err){ observer.onError(message); },
// other functions based on your logic, might want handling
});
return function(){ // dispose
// nothing to do here, yet, you might want to define completion too
}
});
我没有使用 PUBNUB 的经验,但根据您提供的信息,我可能会这样实现它:
// your client
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
var source = Rx.Observable.defer(function() {
return Rx.Observable.create(function (observer) {
client.subscribe({
channel: 'admin:posts',
message: function(message){ observer.onNext(message); },
connect: function(){console.log("Connected");},
disconnect: function(){console.log("Disconnected");},
reconnect: function(){console.log("Reconnected");},
error: function(err){ observer.onError(err); }
});
return Rx.Disposable.create(function() { // dispose
client.unsubscribe();
});
});
})
.publish()
.refCount();
我正在为如何使用 rxjs 库将以下内容转换为可观察对象而苦苦挣扎。
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
client.subscribe({
channel: 'admin:posts',
message: function(message, env, channel){console.log("Message");},
connect: function(){console.log("Connected");},
disconnect: function(){console.log("Disconnected");},
reconnect: function(){console.log("Reconnected");},
error: function(){console.log("Network Error");},
});
我希望将消息回调与其他回调一起转换为可观察对象。
关于如何做到这一点有什么想法吗?
谢谢
更新 - 2015 年 4 月 29 日
这是我最终用来实现此功能的工具。我应该补充一点,只有在用户登录并清理注销后,我才需要订阅 pubnub。
请让我知道这是否是一个好方法:
var self = this;
var logins = Rx.Observable.create(function (obs) {
//Using a session manager in ember.
self.get('session').on('sessionAuthenticationSucceeded', function(e){
var data = {
token:this.content.secure.token,
email:this.content.secure.email
}
obs.onNext(data);
});
self.get('session').on('sessionAuthenticationFailed', function(e){obs.onError(e)});
return function(){
self.get('session').off('sessionAuthenticationSucceeded', function(e){obs.onNext(e)});
self.get('session').off('sessionAuthenticationFailed', function(e){obs.onError(e)});
}
});
var logouts = Rx.Observable.create(function (obs) {
self.get('session').on('sessionInvalidationSucceeded', function(){obs.onNext()});
self.get('session').on('sessionInvalidationFailed', function(e){obs.onError(e)});
return function(){
self.get('session').off('sessionInvalidationSucceeded', function(e){obs.onNext(e)});
self.get('session').off('sessionInvalidationFailed', function(e){obs.onError(e)});
}
});
var dataStream = logins
.map(function(credentials){
return PUBNUB.init({
publish_key: 'pub_key',
subscribe_key: 'sub_key',
auth_key: credentials.token,
uuid: credentials.email
});
})
.scan(function(prev, current){
prev.unsubscribe({
channel_group:'admin:data'
});
return current;
})
.concatMap(function(client){
return Rx.Observable.create(function (observer) {
client.subscribe({
channel_group:'admin:data',
message:function(message, env, channel){
observer.onNext({message:message, env:env, channel:channel, error:null});
},
error:function(error){
observer.onNext({error:error})
}
});
return function(){
client.unsubscribe({channel_group:'admin:data'});
}
});
})
.takeUntil(logouts)
.publish()
.refCount();
var sub1 = dataStream.subscribe(function(data){
console.log('sub1', data);
});
var sub2 = dataStream.subscribe(function(data){
console.log('sub2', data);
});
当然,您可以像这样使用 .create
创建源:
// your client
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
var source = Rx.Observable.create(function (observer) {
client.subscribe({
channel: 'admin:posts',
message: function(message){ observer.onNext(message); }, // next item
error: function(err){ observer.onError(message); },
// other functions based on your logic, might want handling
});
return function(){ // dispose
// nothing to do here, yet, you might want to define completion too
}
});
我没有使用 PUBNUB 的经验,但根据您提供的信息,我可能会这样实现它:
// your client
var client = PUBNUB.init({
publish_key: 'pubkey',
subscribe_key: 'subkey'
});
var source = Rx.Observable.defer(function() {
return Rx.Observable.create(function (observer) {
client.subscribe({
channel: 'admin:posts',
message: function(message){ observer.onNext(message); },
connect: function(){console.log("Connected");},
disconnect: function(){console.log("Disconnected");},
reconnect: function(){console.log("Reconnected");},
error: function(err){ observer.onError(err); }
});
return Rx.Disposable.create(function() { // dispose
client.unsubscribe();
});
});
})
.publish()
.refCount();