Angular 已缓存,多播可观察,在订阅之前不要进行 HTTP 调用

Angular cached, multicast observable, don't make HTTP call until subscribed

我有一个使用可观察对象公开某些全局状态的服务。必须通过进行 HTTP 调用来初始化状态。我希望状态的所有消费者都收到相同的值,当他们订阅可观察对象时收到当前值,并在状态更改时收到更新。

这种情况通常使用 Subject 处理,可以通过将其订阅到 HTTP 请求返回的可观察对象来初始化。

但是,我不希望在第一次订阅全局状态可观察对象之前执行 HTTP 请求。如果使用 Subjectsubscribe 方法,则调用 HTTP 请求。

我在服务中有以下代码。这行得通,但似乎应该有更简单的方法来做到这一点。

  private initialized = false;
  private readonly subject = new ReplaySubject<string>(1);

  constructor(private readonly httpClient: HttpClient) {}

  get data$(): Observable<string> {
    return new Observable<string>(subscriber => {
      if (!this.initialized) {
        this.refresh();
      }
      let subscription = this.subject.subscribe(value => subscriber.next(value));
      return () => subscription.unsubscribe();
    });
  }

  refresh(): void {
    this.httpClient.get<string>('https://...')
      .pipe(
        tap(() => this.initialized = true)
      )
      .subscribe(value => this.subject.next(value));
  }

沙盒:https://codesandbox.io/s/reverent-almeida-d2zbjx?file=/src/app/test.service.ts

我不确定您的 data$() 方法中的订阅和取消订阅在做什么。这个更简单的版本(返回subject.asObservable())是否也符合您的要求?

get data$(): Observable<string> {
  if (!this.initialized) {
    this.refresh();
  }
  return this.subject.asObservable();
}

在类似情况下,我倾向于创建一个像这样工作的服务。

  1. 它公开了触发命令执行的方法,例如执行 http 调用
  2. 它公开了 Observables,客户可以在他们想要对变化做出反应时订阅这些 Observables
  3. 内部使用 Subjects 在 public Observables 执行命令时触发通知 returns 某些事情(或错误)

所以,在代码中,它看起来像这样

export class MyService {
  private _subject = new ReplySubject<any>(1);
  public data$ = this._subject.asObservable();

  constructor(private readonly httpClient: HttpClient) {}

  public executeRemoteService(inputParams: any) {
     // the request is executed as soon as a client calls this method
     // the subject notifies, errors and completes mirroring what the 
     // http client does
     this.httpClient.get<string>('https://...')
      .pipe(
         tap(this.data$)
      )
      .subscribe()
  }  
}

任何对 data$ 感兴趣的客户端,即对全局状态的变化感兴趣,都必须订阅 data$,这随时都可能发生。

什么时候触发服务的执行,任何组件都可以调用 executeRemoteService 方法的执行,然后触发 data$ 中的通知。

以下应该可以完成您想要的。 data$ 字段应该是两个 observable 的合并。第一个将是初始 http 调用,另一个将监听来自 refreshSubject 的发射以再次调用该服务。

在第一次订阅完成之前不会执行初始调用,任何对 refreshSubject 的调用也将被忽略。

最后,使用shareReplay 将使所有后续订阅者获得最后的结果。使用 refCount 参数,以便如果订阅者计数变为零,则订阅完全取消订阅,下一个订阅者将再次发起第一次调用。如果这种行为是不可取的,你可以只使用 shareReplay(1) 它总是有最后的结果并继续听 refreshSubject.

readonly refreshSubject = new subject<void>();
readonly data$ = merge(
  this.httpClient.get<string>('https://...'),
  this.refreshSubject.pipe(
    switchMap(() => this.httpClient.get<string>('https://...'))
  )
).pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);

根据@DanielGimenez 的回答,我实现了下面的解决方案。它的优点是稍微更简单,更DRY。

private readonly refreshSubject = new BehaviorSubject<void>(void 0);

constructor(private readonly httpClient: HttpClient) {
  this.data$ = this.refreshSubject
    .pipe(
        switchMap(() => this.httpClient.get<string>('https://...')),
        shareReplay(1)
    );
}

readonly data$: Observable<string>;

refresh(): void {
  this.refreshSubject.next();
}