完成后 n 秒重复请求 (Angular2 - http.get)
Repeat request (Angular2 - http.get) n seconds after finished
我玩了一下angular2,过了一会儿就卡住了。
使用 http.get
可以很好地处理单个请求,但我想每 4 秒轮询一次实时数据,经过一段时间的修修补补并阅读了很多 reactivex 的东西,我最终得到了:
Observable.timer(0,4000)
.flatMap(
() => this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
)
.share();
在 http.get
-observable 发出请求结果后,是否有一种简单 方法来启动(4 秒)间隔? (或者我最终会进入 observable-hell 吗?)
我想要的时间表:
Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action: Request - - Response - - - - - - - - - - - - - - - - - - - -Request-...
Wait: | wait for 4 seconds -------------------------> |
如果方便的话可以试试间隔。调用 subscribe
会给你 Subscription
,让你可以在一段时间后取消轮询。
let observer = Observable.interval(1000 * 4);
let subscription = observer.subsscribe(x => {
this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
});
....
// if you don't require to poll anymore..
subscription.unsubscribe();
我自己设法做到了,唯一的缺点是 http.get
不能更容易地重复。
pollData(): Observable<any> {
//Creating a subject
var pollSubject = new Subject<any>();
//Define the Function which subscribes our pollSubject to a new http.get observable (see _pollLiveData() below)
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(
(res) => { pollSubject.next(res) }
);
};
//Subscribe our "subscription-function" to custom subject (observable) with 4000ms of delay added
pollSubject.delay(4000).subscribe(subscribeToNewRequestObservable);
//Call the "subscription-function" to execute the first request
subscribeToNewRequestObservable();
//Return observable of our subject
return pollSubject.asObservable();
}
private _pollLiveData() {
var url = 'http://localhost:4711/poll/';
return this._http.get(url)
.map(
(res) => { return res.json(); }
);
};
以下是您不能使用更直接订阅的原因:
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(pollSubject);
};
完成 http.get
-observable 也会完成您的主题并防止它发出更多项目。
这仍然是一个 cold observable,因此除非您订阅它,否则不会发出任何请求。
this._pollService.pollData().subscribe(
(res) => { this.count = res.count; }
);
更新到 RxJS 6
import { timer } from 'rxjs';
import { concatMap, map, expand, catchError } from 'rxjs/operators';
pollData$ = this._http.get(this._url)
.pipe(
map(this.extractData),
catchError(this.handleError)
);
pollData$.pipe(
expand(_ => timer(4000).pipe(concatMap(_ => pollData$)))
).subscribe();
我正在使用 RxJS 5,但不确定 RxJS 4 等效运算符是什么。无论如何,这是我的 RxJS 5 解决方案,希望对您有所帮助:
var pollData = this._http.get(this._url)
.map(this.extractData)
.catch(this.handleError);
pollData.expand(
() => Observable.timer(4000).concatMap(() => pollData)
).subscribe();
扩展运算符将发射数据并递归地开始一个新的 Observable 每次发射
Can Nguyen 对 的小修改,以防您希望轮询延迟取决于之前的请求完成状态。
var pollData = () => request() // make request
.do(handler, errorHandler) // handle response data or error
.ignoreElements() // ignore request progress notifications
.materialize(); // wrap error/complete notif-ns into Notification
pollData() // get our Observable<Notification>...
.expand( // ...and recursively map...
(n) => Rx.Observable // ...each Notification object...
.timer(n.error ? 1000 : 5000) // ...(with delay depending on previous completion status)...
.concatMap(() => pollData())) // ...to new Observable<Notification>
.subscribe();
或者:
var pollData = () => request() // make request
.last() // take last progress value
.catch(() => Rx.Observable.of(null)); // replace error with null-value
pollData()
.expand(
(data) => Rx.Observable
.timer(data ? 5000 : 1000) // delay depends on a value
.concatMap(() => pollData()))
.subscribe((d) => {console.log(d);}); // can subscribe to the value stream at the end
我玩了一下angular2,过了一会儿就卡住了。
使用 http.get
可以很好地处理单个请求,但我想每 4 秒轮询一次实时数据,经过一段时间的修修补补并阅读了很多 reactivex 的东西,我最终得到了:
Observable.timer(0,4000)
.flatMap(
() => this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
)
.share();
在 http.get
-observable 发出请求结果后,是否有一种简单 方法来启动(4 秒)间隔? (或者我最终会进入 observable-hell 吗?)
我想要的时间表:
Time(s): 0 - - - - - 1 - - - - - 2 - - - - - 3 - - - - - 4 - - - - - 5 - - - - - 6
Action: Request - - Response - - - - - - - - - - - - - - - - - - - -Request-...
Wait: | wait for 4 seconds -------------------------> |
如果方便的话可以试试间隔。调用 subscribe
会给你 Subscription
,让你可以在一段时间后取消轮询。
let observer = Observable.interval(1000 * 4);
let subscription = observer.subsscribe(x => {
this._http.get(this._url)
.share()
.map(this.extractData)
.catch(this.handleError)
});
....
// if you don't require to poll anymore..
subscription.unsubscribe();
我自己设法做到了,唯一的缺点是 http.get
不能更容易地重复。
pollData(): Observable<any> {
//Creating a subject
var pollSubject = new Subject<any>();
//Define the Function which subscribes our pollSubject to a new http.get observable (see _pollLiveData() below)
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(
(res) => { pollSubject.next(res) }
);
};
//Subscribe our "subscription-function" to custom subject (observable) with 4000ms of delay added
pollSubject.delay(4000).subscribe(subscribeToNewRequestObservable);
//Call the "subscription-function" to execute the first request
subscribeToNewRequestObservable();
//Return observable of our subject
return pollSubject.asObservable();
}
private _pollLiveData() {
var url = 'http://localhost:4711/poll/';
return this._http.get(url)
.map(
(res) => { return res.json(); }
);
};
以下是您不能使用更直接订阅的原因:
var subscribeToNewRequestObservable = () => {
this._pollLiveData()
.subscribe(pollSubject);
};
完成 http.get
-observable 也会完成您的主题并防止它发出更多项目。
这仍然是一个 cold observable,因此除非您订阅它,否则不会发出任何请求。
this._pollService.pollData().subscribe(
(res) => { this.count = res.count; }
);
更新到 RxJS 6
import { timer } from 'rxjs';
import { concatMap, map, expand, catchError } from 'rxjs/operators';
pollData$ = this._http.get(this._url)
.pipe(
map(this.extractData),
catchError(this.handleError)
);
pollData$.pipe(
expand(_ => timer(4000).pipe(concatMap(_ => pollData$)))
).subscribe();
我正在使用 RxJS 5,但不确定 RxJS 4 等效运算符是什么。无论如何,这是我的 RxJS 5 解决方案,希望对您有所帮助:
var pollData = this._http.get(this._url)
.map(this.extractData)
.catch(this.handleError);
pollData.expand(
() => Observable.timer(4000).concatMap(() => pollData)
).subscribe();
扩展运算符将发射数据并递归地开始一个新的 Observable 每次发射
Can Nguyen 对
var pollData = () => request() // make request
.do(handler, errorHandler) // handle response data or error
.ignoreElements() // ignore request progress notifications
.materialize(); // wrap error/complete notif-ns into Notification
pollData() // get our Observable<Notification>...
.expand( // ...and recursively map...
(n) => Rx.Observable // ...each Notification object...
.timer(n.error ? 1000 : 5000) // ...(with delay depending on previous completion status)...
.concatMap(() => pollData())) // ...to new Observable<Notification>
.subscribe();
或者:
var pollData = () => request() // make request
.last() // take last progress value
.catch(() => Rx.Observable.of(null)); // replace error with null-value
pollData()
.expand(
(data) => Rx.Observable
.timer(data ? 5000 : 1000) // delay depends on a value
.concatMap(() => pollData()))
.subscribe((d) => {console.log(d);}); // can subscribe to the value stream at the end