将主题和观察者组合在一个订阅 RxJ 中

Combine subject and observer in a single subscribe RxJs

我有一个行为主体

private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();

constructor() { }

set updatedUsers(users: ActionedUser[]) {
    this.usersSubject.next(users);
}

和一个 Observable

getCurrentUser(): Observable<WebAuthUser> {
    if (environment.production) {
        return this.http.get<WebAuthUser>(`${this.webAuthURL}/wauth/api/user`, { withCredentials: true });
    }
    const devUser: WebAuthUser = {
        userName: 'local_dev_user',
    };

    return of(devUser);
}

我想将它们加入到单个订阅中,但出于某种原因,当我使用 forkJoin() 并订阅它时,没有任何内容被发出

 forkJoin([
        this.saveUsersSubject.usersChange,
        this.api.getCurrentUser(),
    ]);
    this.getUsersDataAndLoggedUserSubscription.subscribe(([ userData, loggedUser]) => {
        
        this.actionedUsers = userData;
        this.currentUser = loggedUser;
    });

我认为这是由于 Subject 造成的,但是在两者都完成后加入他们并提供价值的最佳方式是什么?

选项 1:反应式方法(IMO 优雅)

forkJoin 只会在两个 observable 都完成时发出。最快的方法是将 take(1) 添加到 BehaviorSubject 可观察对象。但这假设您不需要 BehaviorSubject 的未来排放量。如果你想对 BehaviorSubject 的每次发射做出反应,那么你可以使用 combineLatestzip (注意:它们不是同义词)而不是 forkJoin.

import { take } from 'rxjs/operators';

forkJoin([
  this.saveUsersSubject.usersChange.pipe(take(1)),
  this.api.getCurrentUser()
]);

选项 2:同步方法(IMO 不优雅)

SubjectReplaySubject 相反,BehaviourSubject 包含一个特殊的特性,即它“保存”最后一个推送给它的值。您可以随时使用 value getter 或 getValue() 方法同步访问它(两者本质上具有相同的目的)。

private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();

constructor() { }

set updatedUsers(users: ActionedUser[]) {
  this.usersSubject.next(users);
}

public actionedUsers(): ActionedUser[] {
  return this.usersSubject.value;
}
this.api.getCurrentUser().subscribe({
  next: (currentUser: any) => {  
    this.actionedUsers = this.saveUsersSubject.actionedUsers();
    this.currentUser = loggedUser;
  },
  error: (error: any) => {
    // handle errors
  }
});