从一个动态的 Observable 数组中将 Observable 组合成一个 Observable?
Combine Observables into a single Observable from a dynamic array of Observable?
总结:基本上我需要像 from Users where usergroupIds in [1,3,5]
.
这样的东西
给定 usergroupIds$
发出一组组 ID [1,3,5]
我想在 usergroupIds
中合并 userids
的所有用户并合并用户 ID(不同)
这是我想出的:
usergroupIds$.subscribe(usergroupIds => {
const users$s = usergroupIds.map(gid => createUsersObservable(gid))
// [users$ in group 1, users$ in group 3, users$ in group 5]
const users$ = combineLatest(...user$s).pipe(
distinct(user => user.id)
)
})
createUsersObservable = gid =>
collectionData(db.collection('users').where('groupId', '==', gid)) // rxFire firestore
取消订阅并重新订阅users$
每次更改似乎都错了?
是否可以在 RxJS 中完全表达 users$
而无需每次都在订阅中创建它?
更新:
在@FanCheung 的帮助下:
combinedUsers$ = userGroupIds$.pipe(
switchMap(userGroupIds => {
const users$s = userGroupIds.map(groupId =>
createUsersObservable(groupId))
return combineLatest(...users$s)
})
但是,由于 usersObservable 一次发出一组用户,combinedUsers$ 会产生类似 [[userA, userB], [userB, userC], [userA, userD]]
的结果,我不介意在订阅时执行额外的处理:
combinedUsers$.subscribe(combinedUsers => {
const userMap = {}
for (const users of combinedUsers)
users.forEach(user => (userMap[user.id] = user))
const uniqueUsers = Object.values(userMap)
// update UI to uniqueUsers
})
但是,有没有办法使用某种方式将 combinedUsers$
的结果展平,然后执行 distinct
运算符?
看看这项工作是否可行。基本上它会在 usergroupIds$ 发出时触发,因此您将获得新的动态可观察对象。如果您始终希望源可观察对象 usergroupdIds$
在订阅时发出最后保存的值
,则需要 shareReplay
usergroupIds$.pipe(
shareReplay(1),
switchMap(usergroupIds => {
const users$Array = usergroupIds.map(gid => createUsersObservable(gid))
// [users$ in group 1, users$ in group 3, users$ in group 5]
return forkJoin(...users$Array).pipe(
map(arr=>[].concat(...arr)),
switchMap(arr=>from(arr)),
distinct(user => user.id)
)
})
)
所以您不希望每次 usergroupIds$
发出时都向 createUsersObservable
发出单独的请求。这听起来像是 mergeScan
:
的一个很好的用例
usergroupIds$.pipe(
mergeScan((users, user) => createUsersObservable().pipe(
map(user => [user, ...users]),
), []),
);
总结:基本上我需要像 from Users where usergroupIds in [1,3,5]
.
给定 usergroupIds$
发出一组组 ID [1,3,5]
我想在 usergroupIds
中合并 userids
的所有用户并合并用户 ID(不同)
这是我想出的:
usergroupIds$.subscribe(usergroupIds => {
const users$s = usergroupIds.map(gid => createUsersObservable(gid))
// [users$ in group 1, users$ in group 3, users$ in group 5]
const users$ = combineLatest(...user$s).pipe(
distinct(user => user.id)
)
})
createUsersObservable = gid =>
collectionData(db.collection('users').where('groupId', '==', gid)) // rxFire firestore
取消订阅并重新订阅users$
每次更改似乎都错了?
是否可以在 RxJS 中完全表达 users$
而无需每次都在订阅中创建它?
更新: 在@FanCheung 的帮助下:
combinedUsers$ = userGroupIds$.pipe(
switchMap(userGroupIds => {
const users$s = userGroupIds.map(groupId =>
createUsersObservable(groupId))
return combineLatest(...users$s)
})
但是,由于 usersObservable 一次发出一组用户,combinedUsers$ 会产生类似 [[userA, userB], [userB, userC], [userA, userD]]
的结果,我不介意在订阅时执行额外的处理:
combinedUsers$.subscribe(combinedUsers => {
const userMap = {}
for (const users of combinedUsers)
users.forEach(user => (userMap[user.id] = user))
const uniqueUsers = Object.values(userMap)
// update UI to uniqueUsers
})
但是,有没有办法使用某种方式将 combinedUsers$
的结果展平,然后执行 distinct
运算符?
看看这项工作是否可行。基本上它会在 usergroupIds$ 发出时触发,因此您将获得新的动态可观察对象。如果您始终希望源可观察对象 usergroupdIds$
在订阅时发出最后保存的值
shareReplay
usergroupIds$.pipe(
shareReplay(1),
switchMap(usergroupIds => {
const users$Array = usergroupIds.map(gid => createUsersObservable(gid))
// [users$ in group 1, users$ in group 3, users$ in group 5]
return forkJoin(...users$Array).pipe(
map(arr=>[].concat(...arr)),
switchMap(arr=>from(arr)),
distinct(user => user.id)
)
})
)
所以您不希望每次 usergroupIds$
发出时都向 createUsersObservable
发出单独的请求。这听起来像是 mergeScan
:
usergroupIds$.pipe(
mergeScan((users, user) => createUsersObservable().pipe(
map(user => [user, ...users]),
), []),
);