在 rxjs 和 ngrx 中轮询
Polling in rxjs and ngrx
嘿,我是 rxjs 和 ngrx 的新手,我正在使用这些技术构建应用程序。
我正在考虑如何使用 rxjs 可观察对象和运算符创建轮询系统。
我创建了一个基本的轮询系统,其中包含可观察对象的订阅图。每个 observable 每 5 秒向 ngrx-effects 发送一个动作,ngrx-effects 处理动作并执行副作用,例如使用服务的 http 调用。
我的问题是我想为当前的池系统创建一个特定的机制,它具有以下条件:
1.The 第一个池立即发生,我为此使用计时器(0,poolingTime),
或间隔 stratwith(null).
2.The 池知道根据前一个请求的时间延迟下一个请求 request.I 意味着当前一个请求完成时,第二个请求发生。
第一个条件我一个人完成,第二个条件(2)我需要帮助才能实现。
为了完成第二个条件,我想到了 debounce 或 throttle,但正如我一开始所说的,我对 rxjs 没有太多经验。
这是我的简单池化系统的代码
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { throttleTime, debounceTime, startWith, tap, delay } from 'rxjs/operators';
import { Utils } from '../utils';
@Injectable()
export class PoolingService {
private subscriptions: { [id: string]: Subscription };
constructor() {
this.subscriptions = {};
}
public startPooling(time: number, callback: Function): string {
const id = Utils.guid();
const interval$ = interval(time).pipe(tap(tick => console.log("tick", tick))).pipe(startWith(null));
// const interval$ = timer(0, time).pipe(tap(tick => console.log("tick", tick)));
const subscription = interval$.subscribe(() => { callback() });
this.subscriptions[id] = subscription;
return id;
}
public stopPooling(id: string) {
const subscription = this.subscriptions[id];
if (!subscription) {
return;
}
subscription.unsubscribe();
}
}
下面是轮询服务的使用:
ngOnInit() {
this.store.select('domains').subscribe((state: any) => {
const { list, lastAddedDomain } = state;
this.markers = list;
this.roots = Utils.list_to_tree(list);
});
this.poolService.startPooling(5000, () => {
this.store.dispatch(new AllHttpActions.HttpActionGet({}, HttpMethods.GET, "/getDomainsForMap", AllDomainActions.FETCH_DOMAINS, Utils.guid()));
});
}
我可能会尝试这样的事情。我在整个代码中添加了注释,这应该有助于您理解我为什么做某些事情。
import { Injectable, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { startWith, tap, mergeMap, take, takeUntil, filter, map, catchError, delay } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
import { of } from 'rxjs/observable/of';
import { Subscription } from 'rxjs/Subscription';
@Injectable()
export class PollingService implements OnDestroy {
private destroyed$ = new Subject<any>();
poll<PollResultType>(intervalTime: number, pollFunction: () => Observable<PollResultType>): Observable<any> {
let isRequesting = false;
return timer(0, intervalTime)
.pipe(
// When the service is destroyed, all polls will be unsubscribed from
takeUntil(this.destroyed$)),
tap(tick => console.log('tick', tick))),
// Only continue if isRequesting is false
filter(() => !isRequesting)),
// Set isRequesting to true before requesting data
tap(() => isRequesting = true)),
// Execute your poll function
mergeMap(pollFunction)),
// Set isRequesting to false, so the next poll can come through
tap(() => isRequesting = false)
);
}
ngOnDestroy() {
// When the service gets destroyed, all existing polls will be destroyed as well
this.destroyed$.next();
this.destroyed$.complete();
}
}
// In this example this is a service. But this could also be a component, directive etc.
@Injectable()
export class ConsumerService {
private subscription: Subscription;
private requester: Observable<any>;
constructor(private polling: PollingService, private http: HttpClient) {
// Instead of calling poll and subscribing directly we do not subscribe.
// Like that we can have a requester where we can subscribe to activate
// the polling. You might not need that.
this.requester = this.polling.poll(
500,
// This is our polling function which should return another observable
() => this.http.get('https://cors-test.appspot.com/test')
.pipe(
// Uncomment following line to add artificial delay for the request
// delay(2000),
// Don't forget to handle errors
catchError(error => {
return of('Failed');
})
)
);
// Let's activate our poll right away
this.activate();
}
activate() {
// Deactivate on activation to deactivate any subscriptions that are already running
this.deactivate();
// Subscribe to activate polling and do something with the result
this.subscription = this.requester
// This is for testing purposes. We don't want to overload the server ;)
.pipe(take(10))
.subscribe(res => console.log(res));
}
deactivate() {
if (this.subscription) {
this.subscription.unsubscribe();
this.subscription = undefined;
}
}
}
可能需要注意一些一般事项:
- 要运行这段代码你需要做以下事情:
- 将代码复制到源中的 ts 文件中。
- 将 PollingService 和 ConsumerService 添加到您的应用程序模块提供程序。
- 在某处添加 ConsumerService 作为依赖项,以便执行。
- 出于测试目的,我将轮询时间设置为 500 毫秒。
- 在ConsumerService 的构造函数中,有一个带有延迟语句的注释掉的行。如果您取消注释该行,您可以模拟如果请求执行时间更长时会发生什么。你应该在控制台看到效果,只要延迟时间比intervalTime
长
- 在 ConsumerService.activate 方法中,我将轮询限制为 10,以免打扰测试背后的服务器 url 我正在调用。
- 通过在不同步骤之间添加
tap(() => ...)
和控制台日志语句,可能有助于更好地理解正在发生的事情。
希望对您有所帮助。
嘿,我是 rxjs 和 ngrx 的新手,我正在使用这些技术构建应用程序。 我正在考虑如何使用 rxjs 可观察对象和运算符创建轮询系统。
我创建了一个基本的轮询系统,其中包含可观察对象的订阅图。每个 observable 每 5 秒向 ngrx-effects 发送一个动作,ngrx-effects 处理动作并执行副作用,例如使用服务的 http 调用。
我的问题是我想为当前的池系统创建一个特定的机制,它具有以下条件:
1.The 第一个池立即发生,我为此使用计时器(0,poolingTime), 或间隔 stratwith(null).
2.The 池知道根据前一个请求的时间延迟下一个请求 request.I 意味着当前一个请求完成时,第二个请求发生。
第一个条件我一个人完成,第二个条件(2)我需要帮助才能实现。 为了完成第二个条件,我想到了 debounce 或 throttle,但正如我一开始所说的,我对 rxjs 没有太多经验。
这是我的简单池化系统的代码
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { Subscription } from 'rxjs/Subscription';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { throttleTime, debounceTime, startWith, tap, delay } from 'rxjs/operators';
import { Utils } from '../utils';
@Injectable()
export class PoolingService {
private subscriptions: { [id: string]: Subscription };
constructor() {
this.subscriptions = {};
}
public startPooling(time: number, callback: Function): string {
const id = Utils.guid();
const interval$ = interval(time).pipe(tap(tick => console.log("tick", tick))).pipe(startWith(null));
// const interval$ = timer(0, time).pipe(tap(tick => console.log("tick", tick)));
const subscription = interval$.subscribe(() => { callback() });
this.subscriptions[id] = subscription;
return id;
}
public stopPooling(id: string) {
const subscription = this.subscriptions[id];
if (!subscription) {
return;
}
subscription.unsubscribe();
}
}
下面是轮询服务的使用:
ngOnInit() {
this.store.select('domains').subscribe((state: any) => {
const { list, lastAddedDomain } = state;
this.markers = list;
this.roots = Utils.list_to_tree(list);
});
this.poolService.startPooling(5000, () => {
this.store.dispatch(new AllHttpActions.HttpActionGet({}, HttpMethods.GET, "/getDomainsForMap", AllDomainActions.FETCH_DOMAINS, Utils.guid()));
});
}
我可能会尝试这样的事情。我在整个代码中添加了注释,这应该有助于您理解我为什么做某些事情。
import { Injectable, OnDestroy } from '@angular/core';
import { Subject } from 'rxjs/Subject';
import { Observable } from 'rxjs/Observable';
import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { startWith, tap, mergeMap, take, takeUntil, filter, map, catchError, delay } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
import { of } from 'rxjs/observable/of';
import { Subscription } from 'rxjs/Subscription';
@Injectable()
export class PollingService implements OnDestroy {
private destroyed$ = new Subject<any>();
poll<PollResultType>(intervalTime: number, pollFunction: () => Observable<PollResultType>): Observable<any> {
let isRequesting = false;
return timer(0, intervalTime)
.pipe(
// When the service is destroyed, all polls will be unsubscribed from
takeUntil(this.destroyed$)),
tap(tick => console.log('tick', tick))),
// Only continue if isRequesting is false
filter(() => !isRequesting)),
// Set isRequesting to true before requesting data
tap(() => isRequesting = true)),
// Execute your poll function
mergeMap(pollFunction)),
// Set isRequesting to false, so the next poll can come through
tap(() => isRequesting = false)
);
}
ngOnDestroy() {
// When the service gets destroyed, all existing polls will be destroyed as well
this.destroyed$.next();
this.destroyed$.complete();
}
}
// In this example this is a service. But this could also be a component, directive etc.
@Injectable()
export class ConsumerService {
private subscription: Subscription;
private requester: Observable<any>;
constructor(private polling: PollingService, private http: HttpClient) {
// Instead of calling poll and subscribing directly we do not subscribe.
// Like that we can have a requester where we can subscribe to activate
// the polling. You might not need that.
this.requester = this.polling.poll(
500,
// This is our polling function which should return another observable
() => this.http.get('https://cors-test.appspot.com/test')
.pipe(
// Uncomment following line to add artificial delay for the request
// delay(2000),
// Don't forget to handle errors
catchError(error => {
return of('Failed');
})
)
);
// Let's activate our poll right away
this.activate();
}
activate() {
// Deactivate on activation to deactivate any subscriptions that are already running
this.deactivate();
// Subscribe to activate polling and do something with the result
this.subscription = this.requester
// This is for testing purposes. We don't want to overload the server ;)
.pipe(take(10))
.subscribe(res => console.log(res));
}
deactivate() {
if (this.subscription) {
this.subscription.unsubscribe();
this.subscription = undefined;
}
}
}
可能需要注意一些一般事项:
- 要运行这段代码你需要做以下事情:
- 将代码复制到源中的 ts 文件中。
- 将 PollingService 和 ConsumerService 添加到您的应用程序模块提供程序。
- 在某处添加 ConsumerService 作为依赖项,以便执行。
- 出于测试目的,我将轮询时间设置为 500 毫秒。
- 在ConsumerService 的构造函数中,有一个带有延迟语句的注释掉的行。如果您取消注释该行,您可以模拟如果请求执行时间更长时会发生什么。你应该在控制台看到效果,只要延迟时间比intervalTime 长
- 在 ConsumerService.activate 方法中,我将轮询限制为 10,以免打扰测试背后的服务器 url 我正在调用。
- 通过在不同步骤之间添加
tap(() => ...)
和控制台日志语句,可能有助于更好地理解正在发生的事情。
希望对您有所帮助。