使用 rjxs 和 angular 轮询请求

Polling requests using rjxs and angular

我正在尝试使用 rxjs 和 angular 创建连续轮询。下面是我的要求的实现。

我的 app.component 模板具有例如 2 个或更多组件(相同组件)。

<widget ticker='BTC'></widget>
<widget ticker='ETH'></widget>

在 widget.component 中,我想从 API 获取数据以使用代码信息填充小部件,但目标是收集所有代码并仅进行一次调用,例如(api/crypto/BTC,ETH) 和 return 所有小部件的数据(在本例中为 2)。每个小部件将从响应中读取数据并每分钟继续获取。

响应示例:

{ BTC: { name: 'Bitcoin', price: 7000 }, ETH: { name: 'Etherium', price: 200 }}

我的小部件组件:

export class widgetComponent implements OnInit, OnDestroy {
  @Input() ticker: any;
  subscription: any;

  constructor(
    private cryptoService: CryptoService
  ) { }

  ngOnInit() {
    this.subscription = this.cryptoService
      .setupSymbol(this.ticker)
      .subscribe(data => {
        this.info = data[this.ticker];
      });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

我的服务:

@Injectable({
  providedIn: 'root'
})
export class CryptoService {
  tickers: any = '';
  polledBitcoin$: Observable<number>;
  load$ = new BehaviorSubject('');

  constructor(
    private http: HttpClient
  ) { }

  bitcoin$ = this.http.get(`api/crypto/${this.tickers}`);

  whenToRefresh$ = of('').pipe(
    delay(1000),
    tap(_ => this.load$.next('')),
    skip(1),
  );

  poll$ = concat(this.bitcoin$, this.whenToRefresh$);

  setupTicker(ticker) {
    this.tickers += ticker + ',' ;

    return this.load$.pipe(
      concatMap(_ => this.poll$),
      share()
    );
  }

我的代码没有按预期运行。每个小部件都会自己调用 API 以获得自动收报机。 但我只想调用一次收集所有代码并通过所有小部件共享数据请求。

不可能在包含所有代码的数组上使用 rxjs 创建流,例如 ['BTC', 'ETH] 然后开始轮询?轮询应该等到所有小部件都生成 setupTicker。

有人求助吗?提前致谢。

有一个代码对象跟踪有多少小部件正在跟踪每个符号,每次添加新符号时都会启动新的轮询订阅。

@Injectable({
  providedIn: 'root'
})
export class CryptoService {
  private tickers: { [ticker]: number } = {};
  private subscription: Subscription;

  tickers$ = new BehaviorSubject<{ [ticker]: { name: string, price: number } }>(undefined);

  constructor(
    private http: HttpClient
  ) { }

  subscribe(ticker: string) {
    if (this.tickers[ticker]) {
      this.tickers[ticker]++;
    } else {
      this.tickers[ticker] = 1;
      if (this.subscription) {
        this.subscription.unsubscribe();
      }
      this.subscription = interval(60000).pipe(
        switchMap(() => this.http.get<{ [ticker]: { name: string, price: number } }>(`api/crypto/${Object.keys(this.tickers).join(',')}`))
      ).subscribe(this.tickers$);
    }
  }

  unsubscribe(ticker: string) {
    if (this.tickers[ticker] > 1) {
      this.tickers[ticker]--;
    } else {
      delete this.tickers[ticker];
      if (Object.keys(this.tickers).length === 0) {
        this.subscription.unsubscribe();
      }
    }
  }
}

并在组件中

export class widgetComponent implements OnInit, OnDestroy {
  @Input() ticker: string;

  ticker$ = this.cryptoService.tickers$.pipe(
    map(ticker => ticker && ticker[this.ticker])
  );

  constructor(
    private cryptoService: CryptoService
  ) { }

  ngOnInit() {
    this.cryptoService.subscribe(this.ticker);
  }

  ngOnDestroy() {
    this.cryptoService.unsubscribe(this.ticker);
  }
}

并在模板中使用异步管道

<ng-content *ngIf="ticker$ | async as tickerVal">
  {{ tickerVal.name }} current price is {{ tickerVal.price }}
</ng-content>