使用 RxJS 和 angular2 HTTP 服务重新连接服务器
Server reconnection using RxJS and angular 2 HTTP service
使用 RxJS 和 Angular 2 处理服务器重新连接的最佳方法是什么?
这是我目前拥有的:
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/observable/throw';
@Injectable()
export class ConnectionService {
// Stores the current status of the connection, false = not connected
serviceOnline: BehaviorSubject<boolean> = new BehaviorSubject(false)
constructor(
private http: Http
) {
this.serviceOnline.asObservable().subscribe((online) => {
console.log("Service online: " + JSON.stringify(online))
})
setTimeout( () => this.setServiceOffline(), 2000 );
}
setServiceOffline() {
console.log("Setting service offline");
this.serviceOnline.next(false);
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
}
testConnection(): Observable<boolean> {
console.log("Testing connection");
return new Observable(observer => {
this.http.get(`http://endpoint/ping`)
.timeout(5000)
.catch((error: any) => Observable.throw(error))
.subscribe(() => {
console.log("Connection successful");
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
observer.next(true);
observer.complete();
}, (error) => {
console.log("Connection failed");
if(this.serviceOnline.getValue() == true)
this.serviceOnline.next(false);
observer.next(false);
observer.complete();
});
})
}
}
控制台结果:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
如果我替换这一行..
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
和
this.testConnection().delay(2000).repeat().take(5).subscribe();
我确实得到了重复:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
Connection failed
Connection failed
Connection failed
Connection failed
public takeUntil(notifier: Observable): Observable
Emits the values emitted by the source Observable until a notifier
Observable emits a value.
serviceOnline BehaviorSubject 只有在服务器状态发生变化时才会更新。在这一行之后...
this.serviceOnline.next(false);
...serviceOnline 仅在 http 服务返回成功响应且当前在线状态为 false(断开连接)时发出另一个值:
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
所以 takeUntil 应该可以工作。我在这里 misunderstanding/missing 是什么?
我是 RxJS 的新手 - 任何其他关于改进此代码中实现的指示也将不胜感激...
这是一个 Plnkr:http://plnkr.co/edit/WUKxH6nFxc8LMKcxDPql?p=info
RxJS 版本 5.4。 Angular 版本 4.
将 serviceOnline 返回为可观察的并不能解决问题:
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();
嗯嗯你试过这样做吗?:
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();
我认为问题的发生是因为 this.serviceOnline
在订阅时总是发出最后一个值,因为它是 BehaviorSubject
的一个实例。当您将它作为通知程序可观察对象传递给 takeUntil
运算符时,takeUntil
会订阅它并立即收到通知,从而立即完成生成的可观察对象。请注意,通知程序可观察性是否发出 true
或 false
并不重要,只要收到任何值,takeUntil
就会完成。
试试 .takeUntil(this.serviceOnline.filter(online => online === true))
。
使用 RxJS 和 Angular 2 处理服务器重新连接的最佳方法是什么?
这是我目前拥有的:
import { Injectable } from '@angular/core';
import { Http, Response } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/takeUntil';
import 'rxjs/add/observable/throw';
@Injectable()
export class ConnectionService {
// Stores the current status of the connection, false = not connected
serviceOnline: BehaviorSubject<boolean> = new BehaviorSubject(false)
constructor(
private http: Http
) {
this.serviceOnline.asObservable().subscribe((online) => {
console.log("Service online: " + JSON.stringify(online))
})
setTimeout( () => this.setServiceOffline(), 2000 );
}
setServiceOffline() {
console.log("Setting service offline");
this.serviceOnline.next(false);
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
}
testConnection(): Observable<boolean> {
console.log("Testing connection");
return new Observable(observer => {
this.http.get(`http://endpoint/ping`)
.timeout(5000)
.catch((error: any) => Observable.throw(error))
.subscribe(() => {
console.log("Connection successful");
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
observer.next(true);
observer.complete();
}, (error) => {
console.log("Connection failed");
if(this.serviceOnline.getValue() == true)
this.serviceOnline.next(false);
observer.next(false);
observer.complete();
});
})
}
}
控制台结果:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
如果我替换这一行..
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline).subscribe();
和
this.testConnection().delay(2000).repeat().take(5).subscribe();
我确实得到了重复:
Service online: false
Setting service offline
Service online: false
Testing connection
Connection failed
Connection failed
Connection failed
Connection failed
Connection failed
public takeUntil(notifier: Observable): Observable
Emits the values emitted by the source Observable until a notifier Observable emits a value.
serviceOnline BehaviorSubject 只有在服务器状态发生变化时才会更新。在这一行之后...
this.serviceOnline.next(false);
...serviceOnline 仅在 http 服务返回成功响应且当前在线状态为 false(断开连接)时发出另一个值:
if(this.serviceOnline.getValue() == false)
this.serviceOnline.next(true);
所以 takeUntil 应该可以工作。我在这里 misunderstanding/missing 是什么?
我是 RxJS 的新手 - 任何其他关于改进此代码中实现的指示也将不胜感激...
这是一个 Plnkr:http://plnkr.co/edit/WUKxH6nFxc8LMKcxDPql?p=info
RxJS 版本 5.4。 Angular 版本 4.
将 serviceOnline 返回为可观察的并不能解决问题: this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();
嗯嗯你试过这样做吗?:
this.testConnection().delay(2000).repeat().takeUntil(this.serviceOnline.asObservable()).subscribe();
我认为问题的发生是因为 this.serviceOnline
在订阅时总是发出最后一个值,因为它是 BehaviorSubject
的一个实例。当您将它作为通知程序可观察对象传递给 takeUntil
运算符时,takeUntil
会订阅它并立即收到通知,从而立即完成生成的可观察对象。请注意,通知程序可观察性是否发出 true
或 false
并不重要,只要收到任何值,takeUntil
就会完成。
试试 .takeUntil(this.serviceOnline.filter(online => online === true))
。