rxjs:从 2 个端点加载许多并合并到单个 Observable

rxjs: load many for many from 2 endpoints and merge into single Observable

我有一个端点用于获取用户列表,另一个端点用于获取每个用户的公寓列表:

getUsers() => Observable<User[]>;
getUserApartments(userId) => Observable<Apartment[]>`

如何将两者的数据合并为一个可观察对象:

const usersWithApartments$: Observable<{ user: User, apartments: Apartment[] }[]> = getUsers().pipe(
    // users.map(user => { user, apartments: getUserApartments(user.id) }) <-- turn this pseudocode into Rxjs
);

就是这样:

const toUserWithApartmentsStream = (user: User) => getUserApartments(user.id).pipe(
  map((apartments: Apartment[]) => ({ user, apartments }))
);

const usersWithApartments$: Observable<{ user: User, apartments: Apartment[] }[]> =
  getUsers().pipe(
    switchMap((users: User[]) => users),
    concatMap((user: User) => toUserWithApartmentsStream(user)),
    toArray()
  );

现在getUserApartments会一个接一个地连续执行,我可以写mergeMap而不是concatMapgetUserApartments会并发执行。感谢 Rafi Henig 的评论

函数toUserWithApartmentsStream只是一个辅助函数,因此代码看起来更简洁。我可以将它内联写入 concatMap.

因为 getUsers() 的 return 类型是 Users[] 我假设所需的类型是 { user: User, apartments: Apartment[]> 的集合而不是单个。

请考虑以下实现:

class UserApartments {
  user: User;
  apartments: Apartment[];
}
const usersWithApartments$: Observable<UserApartments[]> = getUsers()
  .pipe(
    switchMap(users => forkJoin(
      users.map(user => getUserApartments(user.id).pipe(
        map(apartments => ({ user, apartments }))
      ))
    ))
  );