Angular 9 与 RxJs - 结合 2 个可观察结果产生数据,但异步管道总是看到空结果

Angular 9 with RxJs - combining 2 observables results in data but async pipe always sees a null result

我有 2 个来自 Firebase 的 return 数据的可观察对象,无论我使用什么类型的组合运算符(forkJoin、CombineLatest 等),异步管道都不会接收数据。

我有一个 AuthService,它有一个从 Firebase 实时数据库初始化用户的方法:

  private initUser(firebaseUser: firebase.User) {
    const myUser: User = {account: {}, profile: {}};
    const userId = `US_${firebaseUser.uid}`;

    const accountSnapshot: Observable<UserAccount> = this.db
      .object(`/users/accounts/${userId}`)
      .valueChanges()
      .pipe(first()) as Observable<UserAccount>;

    const profileSnapshot: Observable<UserProfile> = this.db
      .object(`/users/profiles/${userId}`)
      .valueChanges()
      .pipe(first()) as Observable<UserProfile>;

    forkJoin([accountSnapshot, profileSnapshot])
      .subscribe((result: [UserAccount, UserProfile]) => {
        if (result) {
          myUser.account = result[0];
          myUser.profile = result[1];
          this.setCurrentUser(myUser);
        }
      });
  }

CurrentUser 设置为 getter/setter 的主题:

  private currentUser = new Subject<User>();

  public get getCurrentUser(): Observable<User> {
    return this.currentUser.asObservable();
  }

  public setCurrentUser(user: User): void {
    this.currentUser.next(user);
  }

在消费组件中:

  currentUser$: Observable<User>;
  user: User;


  ngOnInit(): void {
    this.currentUser$ = this.authService.getCurrentUser;
    this.currentUser$.subscribe(user => {
      this.user = user;
    });
  }

并在模板中:

<p>Email: {{ ((currentUser$ | async)?.account).email }}</p>. <- ** DOES NOT WORK **
<p>Email: {{ (user?.account).email }}</p> <- ** DOES WORK **

现在,这是奇怪的部分。如果我将 initUser 方法更改为不合并来自 firebase 的可观察对象,而是单独获取它们(并将用户配置文件和帐户设置为单独的操作),则 async 有效。像这样:

accountSnapshot.subscribe((account: UserAccount) => {
  if (account) {
    myUser.account = account;
    this.setCurrentUser(myUser);
  }
});
profileSnapshot.subscribe((profile: UserProfile) => {
  if (profile) {
    myUser.profile = profile;
    this.setCurrentUser(myUser);
  }
});

然后异步管道工作。

我试过将 forkJoin 更改为 CombineLatestzip。我也试过删除 pipe(first())async 管道 never 工作。

这是为什么?

异步或我的代码有问题吗,或者我对异步工作原理的理解...?

这是因为 Subject 在您的视图准备就绪之前触发。您应该将其更改为 ReplaySubject,以便在订阅时重播最新值。

private currentUser = new ReplaySubject<User>(1);

(题外)

除此之外,使用 initUser 中的嵌套 subscribe 似乎有点奇怪。对我来说,您可以将代码简化为以下内容:

private readonly initUser$ = new Subject<firebase.User>();

readonly currentUser$: Observable<User> = this.initUser$.pipe(
  map((user) => `US_${user.uid}`),
  switchMap((userId) => combineLatest([
    this.db.object<UserAccount>(`/users/accounts/${userId}`).valueChanges(),
    this.db.object<UserProfile>(`/users/profiles/${userId}`).valueChanges()
  ])),
  map(([ account, profile ]) => ({ account, profile } as User)),
  shareReplay(1)
);

private initUser(firebaseUser: firebase.User) {
  this.initUser$.next(firebaseUser);
}

在您的消费组件中,您可以使用服务 currentUser$ observable 来获取用户详细信息

它不起作用,因为您已经订阅了那个 forkjoin :-

将其更改为:-

this.currentUser$ = forkJoin([accountSnapshot, profileSnapshot])
      .pipe(map((result: [UserAccount, UserProfile]) => {
        if (result) {
          myUser.account = result[0];
          myUser.profile = result[1];
          this.setCurrentUser(myUser);
        }
      }));

然后在您的模板中将此 currentUser$ 与 asyncpipe 结合使用。