如何在没有嵌套订阅的情况下使用 rxjs 处理内部订阅数组

How to handle an array of inner subscriptions with rxjs without nested subscriptions

我已经this example on Stackblitz准备好我的问题了。在这个 Angular 应用程序中,我有一个 NestedSolutionComponent 我当前的工作解决方案和一个 AppComponent,我想通过使用适当的 rxjs 操作来获得相同的结果。

对于一个稍微复杂一点的真实示例,我正在寻找一种解决方案来将我的多个内部订阅的结果映射到我的外部订阅的数组。

对用户服务的 REST 调用为我提供了这个数组:

  [
    {
      id: 1,
      name: 'User One',
      groupIds: [1, 3]
    },
    {
      id: 2,
      name: 'User Two',
      groupIds: [2, 3, 4]
    },
  ]

对于每个组,我想调用一个 REST 组服务,该服务为我提供有关用户组的更多信息。总而言之,我调用了 5 次群组服务,群组数组中的每个 ID 都被调用了一次。完成后,结果应映射到组数组中 - 但不是只有 ID,整个对象应存储到数组中。

解决方案应如下所示:

  [
    {
      id: 1
      name: 'User One'
      groups: [
        { id: 1, name: 'Group One' },
        { id: 3, name: 'Group Three' }
      ] 
    },
    {
      id: 2
      name: 'User Two'
      groups: [
        { id: 2, name: 'Group Two' },
        { id: 3, name: 'Group Three' },
        { id: 4, name: 'Group Four' }
      ] 
    }
  ]

通过嵌套订阅,解决方案很简单 - 但很难看。我先调用用户服务,然后调用每个用户每个组:

    this.appService.getUsers().subscribe((users) => {
      const _usersWithGroupNames = users.map((user) => {
        const userWithGroupNames = {
          id: user.id,
          name: user.name,
          groups: [],
        } as UserWithGroupNames;
        user.groupIds.forEach((groupId) => {
          this.appService.getGroupById(groupId).subscribe((groupWithName) => {
            userWithGroupNames.groups.push({
              id: groupWithName.id,
              name: groupWithName.name,
            });
          });
        });
        return userWithGroupNames;
      });
      this.usersWithGroupNames.next(_usersWithGroupNames); // Subject
    });

我已经花了好几个小时,但我真的没有看到任何正确的 rxjs 运算符的解决方案。我尝试了 switchMapmergeMap,但以一堆嵌套的地图操作而告终。 forkJoin 似乎也帮不了我,因为我收到一个数组,我必须按特定顺序调用内部订阅。当我在管道中调用多个 mergeMaps 时,我无法访问以前的值。我想要这样的解决方案

// not real code, just dummy code
userService.pipe(
  xmap(users => generateUsersWithEmptyGroupArray()),
  ymap(users => users.groups.forEach(group => groupService.getGroup(group)),
  zmap((user, groups) => mapUserWithGroups(user, groups)) // get single user with all group information
).subscribe(usersWithGroups => this.subject.next(usersWithGroups))

这里有人知道我的问题的正确且可读的解决方案吗?

非常感谢!

不确定您尝试了什么,但这就是我构建此流的方式。它是 switchMap (也可以是 mergeMapconcatMap,在这种情况下应该无关紧要)forkJoin (不确定为什么它对您不起作用,但根据我所见应该是这样),以及 map 来创建具有组名的最终用户。

如果您有任何疑问,我很乐意通过一些说明来更新此答案。

interface User {
  id: number,
  name: string,
  groupIds: number[]
}

interface UserWithGroupNames {
  id: number,
  name: string,
  groups: any[]
}

class ArbiratryClassName {

  public usersWithGroupNames$: Observable<UserWithGroupNames[]>;

  embellishUser(user: User): Observable<UserWithGroupNames> {

    // forkJoin to get all group names
    return forkJoin(
      user.groupIds.map(groupId => 
        this.appService.getGroupById(groupId)
      )
    ).pipe(
      // map to create final UserWithGroupNames
      map(groups => ({
        id: user.id,
        name: user.name,
        groups
      }) as UserWithGroupNames)
    );
    
  }

  arbiratryInit(): void {

    // instead of this.usersWithGroupNames Subject, create
    // the stream directly as usersWithGroupNames$, and subscribe to
    // usersWithGroupNames$ whereever you'd use the subject.

    this.usersWithGroupNames$ = this.appService.getUsers().pipe(

      switchMap((users: User[]) => 
        forkJoin(users.map(u => this.embellishUser(u)))
      )

    );
  }
}

第一种方法:使用switchMap, mergeMap, from, forkJoin

this.appService
  .getUsers()
  .pipe(
    switchMap((users) =>
        // for each user
      from(users).pipe(
       // merge map to run parallel for each user
        mergeMap(({ groupIds, ...user }) =>
        // wait to retrive all group details of current user at mergeMap
        // after completing use map to map user with retrived group 
          forkJoin(
            groupIds.map((id) => this.appService.getGroupById(id))
          ).pipe(map((groups) => ({ ...user, groups })))
        )
      )
    )
  )
  .subscribe((result) => {
    console.log(result);
  });

Demo

在上面的代码中,forkJoin 将等待获取特定用户的所有 groupIds 详细信息,并且如果他已经检索到第一个用户的组 ID 3,它将再次检索 groupId 3 用户 2 的详细信息,依此类推。简而言之,重复组,将检索详细信息。

第二种方法:下面是我们将从用户数组中获取所有 groupsIds 的方法,使它们唯一,并行获取它们的所有详细信息,以及最后,我们将通过 groupIds 将组详细信息映射到用户,这里我们不会等待每个用户组 id 详细信息都被检索,也不会检索重复的组详细信息。

this.appService
.getUsers()
.pipe(
    switchMap((users) =>
      // get all unique groupIds of all users
      from(this.extractUniqueGroupIds(users)).pipe(
        // parallell fetch all group details
        mergeMap((groupId) => this.appService.getGroupById(groupId)),
        // wait to to complete all requests and generate array out of it
        reduce((acc, val) => [...acc, val], []),
        // to check retrived group details
        // tap((groups) => console.log('groups retrived: ', groups)),
        // map retrived group details back to users
        map((groups) => this.mapGroupToUsers(users, groups))
      )
    )
)
.subscribe((result) => {
    console.log(result);
    // this.usersWithGroupNames.next(result);
});

private mapGroupToUsers(users: User[], groups: Group[]): UserWithGroup[] {
    return users.map(({ groupIds, ...user }) => ({
        ...user,
        groups: groupIds.map((id) => groups.find((g) => g.id === id)),
    }));
}

private extractUniqueGroupIds(users: User[]): number[] {
    const set = users.reduce((acc, { groupIds }) => {
        groupIds.forEach((id) => acc.add(id));
            return acc;
    }, new Set<number>());

    return [...set];
}

interface UserWithGroup {
  id: number;
  name: string;
  groups: any[];
}

Demo