Angular 2 - 如何更改 RxJS Observable 的间隔
Angular 2 - How to change the interval of an RxJS Observable
我正在使用 rxJS Observable Interval 来刷新正在获取的数据。我想不出更改间隔设置的方法。我已经看到一些关于使用 Subject class 提供的 rxJS 的东西,但我无法让它工作。
我在此 plunk
中提供了一个简化示例
在 AppComponent 中我有这个方法。
getTime() {
this.timeService.getTime(this.refreshInterval)
.subscribe(t => {
this.currentTime = t;
console.log('Refresh interval is: ' + this.refreshInterval);
}
);
}
在服务组件中我目前有这段代码。
getTime(refreshInterval: number) {
return Observable.interval(refreshInterval)
.startWith(0)
.map((res: any) => this.getDate())
.catch(this.handleError)
}
有人能给我提供一个可行的例子吗?
据我从您的 plnkr 了解到,您的目标是允许用户修改计时器间隔。
如你所料,refreshInterval 的改变会改变 rxJs 声明的流:
this.timeService.getTime(this.refreshInterval)
.subscribe(t => {
this.currentTime = t;
console.log('Refresh interval is: ' + this.refreshInterval);
}
);
这是错误的。
每次更新refreshInterval,你需要:
- 取消订阅或销毁之前的流。
- 创建新流并
再次订阅
您不需要销毁并重新创建整个 Observable 流来更改 refreshInterval
。您只需要更新依赖于变化间隔的流部分。
首先简化您的服务 getTime()
,使其不负责确定输出频率。它所做的只是 return 时间:
getTime() { return (new Date()).toString(); }
现在调用代码将确定时间表。只需 3 个简单步骤:
1.一个调整到所需区间的源函数:
/** Observable waits for the current interval, then emits once */
refreshObs() {return Observable.timer(this.refreshInterval)}
2.一个使用repeat
operator连续重新执行流的可观察链:
getTime$ = Observable.of(null)
.switchMap(e=>this.refreshObs()) // wait for interval, then emit
.map(() => this.timeService.getTime()) // get new time
.repeat(); // start over
3.一个订阅触发整个事情:
ngOnInit(){
this.getTime$.subscribe(t => {
this.currentTime = t;
console.log('refresh interval = '+this.refreshInterval);
});
}
这是有效的,因为 refreshObs()
return 每次重复流时都会有一个新的可观察对象,并且新的可观察对象将根据当前设置的时间间隔等待发射。
我想在此(以及 Stack Overflow 上的其他地方)的基础上建立以前的答案。我的示例有一个通用的 RefreshService
,各种组件都可以将其用于订阅。这样,一个站点可以有一个 "Refresh every X seconds" 组件,每个组件都可以订阅。
https://plnkr.co/edit/960yztjl3dqXQD2XPSei?p=preview
该服务提供的功能 withRefresh
提供 Observable
。它利用 BehaviorSubject
,这将立即触发订阅事件。
export class RefreshService {
static interval$: BehaviorSubject<number> = new BehaviorSubject<number>(30000);
setInterval(newInterval: number){
RefreshService.interval$.next(newInterval);
}
public withRefresh() {
return RefreshService.interval$
.switchMap((int: number) => Observable
.interval(int)
.startWith(0)
);
}
}
那么任何组件都可以使用这个服务如下:
this.refreshService
.withRefresh()
.switchMap(() => /* do something on each interval of the timer */);
我正在使用 rxJS Observable Interval 来刷新正在获取的数据。我想不出更改间隔设置的方法。我已经看到一些关于使用 Subject class 提供的 rxJS 的东西,但我无法让它工作。
我在此 plunk
中提供了一个简化示例在 AppComponent 中我有这个方法。
getTime() {
this.timeService.getTime(this.refreshInterval)
.subscribe(t => {
this.currentTime = t;
console.log('Refresh interval is: ' + this.refreshInterval);
}
);
}
在服务组件中我目前有这段代码。
getTime(refreshInterval: number) {
return Observable.interval(refreshInterval)
.startWith(0)
.map((res: any) => this.getDate())
.catch(this.handleError)
}
有人能给我提供一个可行的例子吗?
据我从您的 plnkr 了解到,您的目标是允许用户修改计时器间隔。
如你所料,refreshInterval 的改变会改变 rxJs 声明的流:
this.timeService.getTime(this.refreshInterval)
.subscribe(t => {
this.currentTime = t;
console.log('Refresh interval is: ' + this.refreshInterval);
}
);
这是错误的。
每次更新refreshInterval,你需要:
- 取消订阅或销毁之前的流。
- 创建新流并 再次订阅
您不需要销毁并重新创建整个 Observable 流来更改 refreshInterval
。您只需要更新依赖于变化间隔的流部分。
首先简化您的服务 getTime()
,使其不负责确定输出频率。它所做的只是 return 时间:
getTime() { return (new Date()).toString(); }
现在调用代码将确定时间表。只需 3 个简单步骤:
1.一个调整到所需区间的源函数:
/** Observable waits for the current interval, then emits once */
refreshObs() {return Observable.timer(this.refreshInterval)}
2.一个使用repeat
operator连续重新执行流的可观察链:
getTime$ = Observable.of(null)
.switchMap(e=>this.refreshObs()) // wait for interval, then emit
.map(() => this.timeService.getTime()) // get new time
.repeat(); // start over
3.一个订阅触发整个事情:
ngOnInit(){
this.getTime$.subscribe(t => {
this.currentTime = t;
console.log('refresh interval = '+this.refreshInterval);
});
}
这是有效的,因为 refreshObs()
return 每次重复流时都会有一个新的可观察对象,并且新的可观察对象将根据当前设置的时间间隔等待发射。
我想在此(以及 Stack Overflow 上的其他地方)的基础上建立以前的答案。我的示例有一个通用的 RefreshService
,各种组件都可以将其用于订阅。这样,一个站点可以有一个 "Refresh every X seconds" 组件,每个组件都可以订阅。
https://plnkr.co/edit/960yztjl3dqXQD2XPSei?p=preview
该服务提供的功能 withRefresh
提供 Observable
。它利用 BehaviorSubject
,这将立即触发订阅事件。
export class RefreshService {
static interval$: BehaviorSubject<number> = new BehaviorSubject<number>(30000);
setInterval(newInterval: number){
RefreshService.interval$.next(newInterval);
}
public withRefresh() {
return RefreshService.interval$
.switchMap((int: number) => Observable
.interval(int)
.startWith(0)
);
}
}
那么任何组件都可以使用这个服务如下:
this.refreshService
.withRefresh()
.switchMap(() => /* do something on each interval of the timer */);