Rxjs switchMap + toArray
Rxjs switchMap + toArray
我遇到了 rxjs 的问题。
我有一个函数可以执行此操作:
- 有组 ID 列表。在示例中:
of(['1', '2'])
- 为他们中的每一个获取聊天列表
- Return 合并的聊天列表
当执行到 toArray 时没有任何反应,没有结果。
代码
get chats$(): Observable<Chat[]> {
return of(['1', '2']).pipe(
filter(groupIds => !!groupIds && groupIds.length > 0),
switchMap(groupIds => groupIds),
switchMap(groupId => getGroupChats(groupId)), // fetch list of chats for the group id
toArray(),
map(doubleList => {
return ([] as Chat[]).concat(...doubleList); // merge chat lists
})
);
}
我也试过这个:
get chats$(): Observable<Chat[]> {
return of(['1', '2']).pipe(
filter(groupIds => !!groupIds && groupIds.length > 0),
map(groupIds =>
groupIds.map(groupId => getGroupChats(groupId))
),
switchMap(chatList$ =>
forkJoin(chatList$).pipe(
map(doubleList => {
return ([] as Chat[]).concat(...doubleList);
})
)
)
);
}
测试
测试响应是:Error: Timeout - Async callback was not invoked within 5000ms
describe("WHEN: get chats$", () => {
const CHAT_MOCK_1: Chat = {
id: "1",
};
const CHAT_MOCK_2: Chat = {
id: "2",
};
it("THEN: get chats$ should return chat list", (done) => {
service.chats$
.subscribe((data) => {
expect(data.length).toEqual(2);
expect(data[0]).toEqual(CHAT_MOCK_1);
expect(data[1]).toEqual(CHAT_MOCK_2);
done();
})
.unsubscribe();
});
});
此代码片段将获取一个 id 数组,分别获取结果并收集到一个数组中
from([1,2,3,4])
.pipe(
mergeMap(a => of(a * 10)), // send request if you need hare or any observable
toArray()
).subscribe(console.log);
最后这就是我所做的(并且有效):
- 使用简单的 Array.map 将我们的组 ID 数组转换为可观察对象列表,每个可观察对象包含该组的聊天数组。
- 使用
forkJoin
获取生成数组的每个可观察值的最终发射值。
代码
get chats$(): Observable<Chat[]> {
return this.groupsIds$.pipe(
skipUntil(this._groupsLoaded$),
switchMap((ids) => {
const chatsList: Observable<Chat[]>[] = ids.map((id) =>
this.getGroupChats$(id)
);
return forkJoin([...chatsList]).pipe(
map((list) => ([] as Chat[]).concat(...list))
);
})
)
}
我仍然对为什么这个有效而不是以前的版本有一些疑问,如果有人能解释那会很棒。
作为结论:不要连接多个 switchMap
。
我遇到了 rxjs 的问题。
我有一个函数可以执行此操作:
- 有组 ID 列表。在示例中:
of(['1', '2'])
- 为他们中的每一个获取聊天列表
- Return 合并的聊天列表
当执行到 toArray 时没有任何反应,没有结果。
代码
get chats$(): Observable<Chat[]> {
return of(['1', '2']).pipe(
filter(groupIds => !!groupIds && groupIds.length > 0),
switchMap(groupIds => groupIds),
switchMap(groupId => getGroupChats(groupId)), // fetch list of chats for the group id
toArray(),
map(doubleList => {
return ([] as Chat[]).concat(...doubleList); // merge chat lists
})
);
}
我也试过这个:
get chats$(): Observable<Chat[]> {
return of(['1', '2']).pipe(
filter(groupIds => !!groupIds && groupIds.length > 0),
map(groupIds =>
groupIds.map(groupId => getGroupChats(groupId))
),
switchMap(chatList$ =>
forkJoin(chatList$).pipe(
map(doubleList => {
return ([] as Chat[]).concat(...doubleList);
})
)
)
);
}
测试
测试响应是:Error: Timeout - Async callback was not invoked within 5000ms
describe("WHEN: get chats$", () => {
const CHAT_MOCK_1: Chat = {
id: "1",
};
const CHAT_MOCK_2: Chat = {
id: "2",
};
it("THEN: get chats$ should return chat list", (done) => {
service.chats$
.subscribe((data) => {
expect(data.length).toEqual(2);
expect(data[0]).toEqual(CHAT_MOCK_1);
expect(data[1]).toEqual(CHAT_MOCK_2);
done();
})
.unsubscribe();
});
});
此代码片段将获取一个 id 数组,分别获取结果并收集到一个数组中
from([1,2,3,4])
.pipe(
mergeMap(a => of(a * 10)), // send request if you need hare or any observable
toArray()
).subscribe(console.log);
最后这就是我所做的(并且有效):
- 使用简单的 Array.map 将我们的组 ID 数组转换为可观察对象列表,每个可观察对象包含该组的聊天数组。
- 使用
forkJoin
获取生成数组的每个可观察值的最终发射值。
代码
get chats$(): Observable<Chat[]> {
return this.groupsIds$.pipe(
skipUntil(this._groupsLoaded$),
switchMap((ids) => {
const chatsList: Observable<Chat[]>[] = ids.map((id) =>
this.getGroupChats$(id)
);
return forkJoin([...chatsList]).pipe(
map((list) => ([] as Chat[]).concat(...list))
);
})
)
}
我仍然对为什么这个有效而不是以前的版本有一些疑问,如果有人能解释那会很棒。
作为结论:不要连接多个 switchMap
。