从一个动态的 Observable 数组中将 Observable 组合成一个 Observable?

Combine Observables into a single Observable from a dynamic array of Observable?

总结:基本上我需要像 from Users where usergroupIds in [1,3,5].

这样的东西

给定 usergroupIds$ 发出一组组 ID [1,3,5]

我想在 usergroupIds 中合并 userids 的所有用户并合并用户 ID(不同)

这是我想出的:

usergroupIds$.subscribe(usergroupIds => {
    const users$s = usergroupIds.map(gid => createUsersObservable(gid))
    //  [users$ in group 1, users$ in group 3, users$ in group 5]
    const users$ = combineLatest(...user$s).pipe(
        distinct(user => user.id)
    )
})

createUsersObservable = gid => 
  collectionData(db.collection('users').where('groupId', '==', gid)) // rxFire firestore

取消订阅并重新订阅users$每次更改似乎都错了?

是否可以在 RxJS 中完全表达 users$ 而无需每次都在订阅中创建它?

更新: 在@FanCheung 的帮助下:

combinedUsers$ = userGroupIds$.pipe(
        switchMap(userGroupIds => {
            const users$s = userGroupIds.map(groupId =>
                createUsersObservable(groupId))
            return combineLatest(...users$s)
        })

但是,由于 usersObservable 一次发出一组用户,combinedUsers$ 会产生类似 [[userA, userB], [userB, userC], [userA, userD]] 的结果,我不介意在订阅时执行额外的处理:

combinedUsers$.subscribe(combinedUsers => {
        const userMap = {}
        for (const users of combinedUsers) 
            users.forEach(user => (userMap[user.id] = user))
        const uniqueUsers = Object.values(userMap)

        // update UI to uniqueUsers
    })

但是,有没有办法使用某种方式将 combinedUsers$ 的结果展平,然后执行 distinct 运算符?

看看这项工作是否可行。基本上它会在 usergroupIds$ 发出时触发,因此您将获得新的动态可观察对象。如果您始终希望源可观察对象 usergroupdIds$ 在订阅时发出最后保存的值

,则需要 shareReplay
usergroupIds$.pipe(
shareReplay(1),
switchMap(usergroupIds => {
    const users$Array = usergroupIds.map(gid => createUsersObservable(gid))
    //  [users$ in group 1, users$ in group 3, users$ in group 5]
   return forkJoin(...users$Array).pipe(
        map(arr=>[].concat(...arr)),
        switchMap(arr=>from(arr)),
        distinct(user => user.id)
    )
})
)

所以您不希望每次 usergroupIds$ 发出时都向 createUsersObservable 发出单独的请求。这听起来像是 mergeScan:

的一个很好的用例
usergroupIds$.pipe(
  mergeScan((users, user) => createUsersObservable().pipe(
    map(user => [user, ...users]),
  ), []),
);

现场演示:https://stackblitz.com/edit/rxjs-8zybdu