将主题和观察者组合在一个订阅 RxJ 中
Combine subject and observer in a single subscribe RxJs
我有一个行为主体
private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();
constructor() { }
set updatedUsers(users: ActionedUser[]) {
this.usersSubject.next(users);
}
和一个 Observable
getCurrentUser(): Observable<WebAuthUser> {
if (environment.production) {
return this.http.get<WebAuthUser>(`${this.webAuthURL}/wauth/api/user`, { withCredentials: true });
}
const devUser: WebAuthUser = {
userName: 'local_dev_user',
};
return of(devUser);
}
我想将它们加入到单个订阅中,但出于某种原因,当我使用 forkJoin() 并订阅它时,没有任何内容被发出
forkJoin([
this.saveUsersSubject.usersChange,
this.api.getCurrentUser(),
]);
this.getUsersDataAndLoggedUserSubscription.subscribe(([ userData, loggedUser]) => {
this.actionedUsers = userData;
this.currentUser = loggedUser;
});
我认为这是由于 Subject 造成的,但是在两者都完成后加入他们并提供价值的最佳方式是什么?
选项 1:反应式方法(IMO 优雅)
forkJoin
只会在两个 observable 都完成时发出。最快的方法是将 take(1)
添加到 BehaviorSubject
可观察对象。但这假设您不需要 BehaviorSubject
的未来排放量。如果你想对 BehaviorSubject
的每次发射做出反应,那么你可以使用 combineLatest
或 zip
(注意:它们不是同义词)而不是 forkJoin
.
import { take } from 'rxjs/operators';
forkJoin([
this.saveUsersSubject.usersChange.pipe(take(1)),
this.api.getCurrentUser()
]);
选项 2:同步方法(IMO 不优雅)
与 Subject
和 ReplaySubject
相反,BehaviourSubject
包含一个特殊的特性,即它“保存”最后一个推送给它的值。您可以随时使用 value
getter 或 getValue()
方法同步访问它(两者本质上具有相同的目的)。
private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();
constructor() { }
set updatedUsers(users: ActionedUser[]) {
this.usersSubject.next(users);
}
public actionedUsers(): ActionedUser[] {
return this.usersSubject.value;
}
this.api.getCurrentUser().subscribe({
next: (currentUser: any) => {
this.actionedUsers = this.saveUsersSubject.actionedUsers();
this.currentUser = loggedUser;
},
error: (error: any) => {
// handle errors
}
});
我有一个行为主体
private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();
constructor() { }
set updatedUsers(users: ActionedUser[]) {
this.usersSubject.next(users);
}
和一个 Observable
getCurrentUser(): Observable<WebAuthUser> {
if (environment.production) {
return this.http.get<WebAuthUser>(`${this.webAuthURL}/wauth/api/user`, { withCredentials: true });
}
const devUser: WebAuthUser = {
userName: 'local_dev_user',
};
return of(devUser);
}
我想将它们加入到单个订阅中,但出于某种原因,当我使用 forkJoin() 并订阅它时,没有任何内容被发出
forkJoin([
this.saveUsersSubject.usersChange,
this.api.getCurrentUser(),
]);
this.getUsersDataAndLoggedUserSubscription.subscribe(([ userData, loggedUser]) => {
this.actionedUsers = userData;
this.currentUser = loggedUser;
});
我认为这是由于 Subject 造成的,但是在两者都完成后加入他们并提供价值的最佳方式是什么?
选项 1:反应式方法(IMO 优雅)
forkJoin
只会在两个 observable 都完成时发出。最快的方法是将 take(1)
添加到 BehaviorSubject
可观察对象。但这假设您不需要 BehaviorSubject
的未来排放量。如果你想对 BehaviorSubject
的每次发射做出反应,那么你可以使用 combineLatest
或 zip
(注意:它们不是同义词)而不是 forkJoin
.
import { take } from 'rxjs/operators';
forkJoin([
this.saveUsersSubject.usersChange.pipe(take(1)),
this.api.getCurrentUser()
]);
选项 2:同步方法(IMO 不优雅)
与 Subject
和 ReplaySubject
相反,BehaviourSubject
包含一个特殊的特性,即它“保存”最后一个推送给它的值。您可以随时使用 value
getter 或 getValue()
方法同步访问它(两者本质上具有相同的目的)。
private usersSubject: BehaviorSubject<ActionedUser[]> = new BehaviorSubject([]);
public usersChange: Observable<ActionedUser[]> = this.usersSubject.asObservable();
constructor() { }
set updatedUsers(users: ActionedUser[]) {
this.usersSubject.next(users);
}
public actionedUsers(): ActionedUser[] {
return this.usersSubject.value;
}
this.api.getCurrentUser().subscribe({
next: (currentUser: any) => {
this.actionedUsers = this.saveUsersSubject.actionedUsers();
this.currentUser = loggedUser;
},
error: (error: any) => {
// handle errors
}
});