更新源 Observable 时未调用 Observable 管道

Observable pipe not called when source Observable is updated

我有两个 Observable,一个依赖于另一个的数据。

当第一个 Observable 上的数据发生变化时,它应该更新第二个 Observable。

不幸的是,这并没有像我预期的那样工作。

这是主要的 Observable(工作正常)

export class LocaleService {
  locale$: Observable<string>;

  constructor(private router: Router) {
    this.locale$ = router.events.pipe(
      filter(evnt => envt instanceof ActivationEnd),
      map((evnt: ActivationEnd) => evnt.snapshot.paramMap.get('locale')),
      shareReplay(1),
    );
  }
}

这是第二个 Observable,它应该根据第一个 Observable 的值更新(如上所示):

// This type is more complex, but made it simple for demonstrating the problem
interface ICountries { code: string; }

export class DataService {
  countries$: Observable<ICountries>;

  constructor(private http: HttpClient, private localService: LocaleService) {
    this.localeService.locale$.subscribe(locale => {
      // this is only here for debug purposes, showing it does change
      console.log('changed locale', locale);
    });

    this.countries$ = localeService.locale$.pipe(
      tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
      switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
      shareReplay(1),
    );
  }
}

如果重要的话,我使用 countries$ 的方式如下(再次简化以演示问题):

@Component({
  template: `<ng-container *ngIf="countries$ | async as countries">{{ countries.code }}</ng-container>`,
})
export class CountryComponent implements OnInit {
  countries$: Observable<ICountries>;

  constructor(private dataService: DataService) {
    this.countries$ = dataService.countries$;
  }
}

几天来我一直在为这个问题苦苦挣扎,在 Whosebug 和许多其他论坛上寻找答案,并询问了更频繁使用 Observables 的朋友,但我似乎无法解决这个问题。任何帮助将不胜感激!

我不太确定,但您可以尝试将可观察量分成两个不同的部分。目前它是单流如下

this.countries = localeService.locale$.pipe(
  tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
  switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
  shareReplay(1),
  tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
  switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
  shareReplay(1)
);

这是我的建议。我刚刚使用带缓冲区 1 的 ReplaySubject 将可观察流拆分为两个不同的流(与问题中的 shareReplay 相同)

LocaleService

export class LocaleService {
  localeSource = new ReplaySubject<string>(1);
  locale$ = this.localeSource.asObservable();

  constructor(private router: Router) {
    router.events.pipe(
      filter(evnt => envt instanceof ActivationEnd),
      map((evnt: ActivationEnd) => evnt.snapshot.paramMap.get('locale'))
    ).subscribe(event => this.localeSource.next(event));
  }
}

数据服务

// This type is more complex, but made it simple for demonstrating the problem
interface ICountries { code: string; }

export class DataService {
  countriesSource = new ReplaySubject<ICountries>(1);
  countries$ = this.countriesSource.asObservable();

  constructor(private http: HttpClient, private localeService: LocaleService) {
    this.localeService.locale$.pipe(
      tap(locale => { console.log('http updated', locale); }),
      switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`))
    ).subscribe(countries => this.countriesSource.next(countries));
  }
}

国家/地区组件

@Component({
  template: `<ng-container *ngIf="(countries$ | async) as countries">{{ countries?.code }}</ng-container>`,
})
export class CountryComponent implements OnInit {
  countries$: Observable<ICountries>;

  constructor(private dataService: DataService) {
    this.countries$ = this.dataService.countries$;
  }
}