使用 rjxs 和 angular 轮询请求
Polling requests using rjxs and angular
我正在尝试使用 rxjs 和 angular 创建连续轮询。下面是我的要求的实现。
我的 app.component 模板具有例如 2 个或更多组件(相同组件)。
<widget ticker='BTC'></widget>
<widget ticker='ETH'></widget>
在 widget.component 中,我想从 API 获取数据以使用代码信息填充小部件,但目标是收集所有代码并仅进行一次调用,例如(api/crypto/BTC,ETH) 和 return 所有小部件的数据(在本例中为 2)。每个小部件将从响应中读取数据并每分钟继续获取。
响应示例:
{ BTC: { name: 'Bitcoin', price: 7000 }, ETH: { name: 'Etherium', price: 200 }}
我的小部件组件:
export class widgetComponent implements OnInit, OnDestroy {
@Input() ticker: any;
subscription: any;
constructor(
private cryptoService: CryptoService
) { }
ngOnInit() {
this.subscription = this.cryptoService
.setupSymbol(this.ticker)
.subscribe(data => {
this.info = data[this.ticker];
});
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
我的服务:
@Injectable({
providedIn: 'root'
})
export class CryptoService {
tickers: any = '';
polledBitcoin$: Observable<number>;
load$ = new BehaviorSubject('');
constructor(
private http: HttpClient
) { }
bitcoin$ = this.http.get(`api/crypto/${this.tickers}`);
whenToRefresh$ = of('').pipe(
delay(1000),
tap(_ => this.load$.next('')),
skip(1),
);
poll$ = concat(this.bitcoin$, this.whenToRefresh$);
setupTicker(ticker) {
this.tickers += ticker + ',' ;
return this.load$.pipe(
concatMap(_ => this.poll$),
share()
);
}
我的代码没有按预期运行。每个小部件都会自己调用 API 以获得自动收报机。
但我只想调用一次收集所有代码并通过所有小部件共享数据请求。
不可能在包含所有代码的数组上使用 rxjs 创建流,例如 ['BTC', 'ETH] 然后开始轮询?轮询应该等到所有小部件都生成 setupTicker。
有人求助吗?提前致谢。
有一个代码对象跟踪有多少小部件正在跟踪每个符号,每次添加新符号时都会启动新的轮询订阅。
@Injectable({
providedIn: 'root'
})
export class CryptoService {
private tickers: { [ticker]: number } = {};
private subscription: Subscription;
tickers$ = new BehaviorSubject<{ [ticker]: { name: string, price: number } }>(undefined);
constructor(
private http: HttpClient
) { }
subscribe(ticker: string) {
if (this.tickers[ticker]) {
this.tickers[ticker]++;
} else {
this.tickers[ticker] = 1;
if (this.subscription) {
this.subscription.unsubscribe();
}
this.subscription = interval(60000).pipe(
switchMap(() => this.http.get<{ [ticker]: { name: string, price: number } }>(`api/crypto/${Object.keys(this.tickers).join(',')}`))
).subscribe(this.tickers$);
}
}
unsubscribe(ticker: string) {
if (this.tickers[ticker] > 1) {
this.tickers[ticker]--;
} else {
delete this.tickers[ticker];
if (Object.keys(this.tickers).length === 0) {
this.subscription.unsubscribe();
}
}
}
}
并在组件中
export class widgetComponent implements OnInit, OnDestroy {
@Input() ticker: string;
ticker$ = this.cryptoService.tickers$.pipe(
map(ticker => ticker && ticker[this.ticker])
);
constructor(
private cryptoService: CryptoService
) { }
ngOnInit() {
this.cryptoService.subscribe(this.ticker);
}
ngOnDestroy() {
this.cryptoService.unsubscribe(this.ticker);
}
}
并在模板中使用异步管道
<ng-content *ngIf="ticker$ | async as tickerVal">
{{ tickerVal.name }} current price is {{ tickerVal.price }}
</ng-content>
我正在尝试使用 rxjs 和 angular 创建连续轮询。下面是我的要求的实现。
我的 app.component 模板具有例如 2 个或更多组件(相同组件)。
<widget ticker='BTC'></widget>
<widget ticker='ETH'></widget>
在 widget.component 中,我想从 API 获取数据以使用代码信息填充小部件,但目标是收集所有代码并仅进行一次调用,例如(api/crypto/BTC,ETH) 和 return 所有小部件的数据(在本例中为 2)。每个小部件将从响应中读取数据并每分钟继续获取。
响应示例:
{ BTC: { name: 'Bitcoin', price: 7000 }, ETH: { name: 'Etherium', price: 200 }}
我的小部件组件:
export class widgetComponent implements OnInit, OnDestroy {
@Input() ticker: any;
subscription: any;
constructor(
private cryptoService: CryptoService
) { }
ngOnInit() {
this.subscription = this.cryptoService
.setupSymbol(this.ticker)
.subscribe(data => {
this.info = data[this.ticker];
});
}
ngOnDestroy() {
this.subscription.unsubscribe();
}
}
我的服务:
@Injectable({
providedIn: 'root'
})
export class CryptoService {
tickers: any = '';
polledBitcoin$: Observable<number>;
load$ = new BehaviorSubject('');
constructor(
private http: HttpClient
) { }
bitcoin$ = this.http.get(`api/crypto/${this.tickers}`);
whenToRefresh$ = of('').pipe(
delay(1000),
tap(_ => this.load$.next('')),
skip(1),
);
poll$ = concat(this.bitcoin$, this.whenToRefresh$);
setupTicker(ticker) {
this.tickers += ticker + ',' ;
return this.load$.pipe(
concatMap(_ => this.poll$),
share()
);
}
我的代码没有按预期运行。每个小部件都会自己调用 API 以获得自动收报机。 但我只想调用一次收集所有代码并通过所有小部件共享数据请求。
不可能在包含所有代码的数组上使用 rxjs 创建流,例如 ['BTC', 'ETH] 然后开始轮询?轮询应该等到所有小部件都生成 setupTicker。
有人求助吗?提前致谢。
有一个代码对象跟踪有多少小部件正在跟踪每个符号,每次添加新符号时都会启动新的轮询订阅。
@Injectable({
providedIn: 'root'
})
export class CryptoService {
private tickers: { [ticker]: number } = {};
private subscription: Subscription;
tickers$ = new BehaviorSubject<{ [ticker]: { name: string, price: number } }>(undefined);
constructor(
private http: HttpClient
) { }
subscribe(ticker: string) {
if (this.tickers[ticker]) {
this.tickers[ticker]++;
} else {
this.tickers[ticker] = 1;
if (this.subscription) {
this.subscription.unsubscribe();
}
this.subscription = interval(60000).pipe(
switchMap(() => this.http.get<{ [ticker]: { name: string, price: number } }>(`api/crypto/${Object.keys(this.tickers).join(',')}`))
).subscribe(this.tickers$);
}
}
unsubscribe(ticker: string) {
if (this.tickers[ticker] > 1) {
this.tickers[ticker]--;
} else {
delete this.tickers[ticker];
if (Object.keys(this.tickers).length === 0) {
this.subscription.unsubscribe();
}
}
}
}
并在组件中
export class widgetComponent implements OnInit, OnDestroy {
@Input() ticker: string;
ticker$ = this.cryptoService.tickers$.pipe(
map(ticker => ticker && ticker[this.ticker])
);
constructor(
private cryptoService: CryptoService
) { }
ngOnInit() {
this.cryptoService.subscribe(this.ticker);
}
ngOnDestroy() {
this.cryptoService.unsubscribe(this.ticker);
}
}
并在模板中使用异步管道
<ng-content *ngIf="ticker$ | async as tickerVal">
{{ tickerVal.name }} current price is {{ tickerVal.price }}
</ng-content>