使用管道异步总是停留在加载

using pipe async always stucks on loading

我正在使用 switchMapcombineLatest

将多个可观察对象加入到一个全局 joined$ 可观察对象中

这是组件 TS 文件:

export class ProgressComponent implements OnInit{

    user: User;
    joined$: Observable<any>;

    constructor(
        protected tasksService: TasksService,
        protected courseService: CoursesService,
        protected fieldService: FieldsService,
        protected sessionService: SessionsService,
        protected applicationService: ApplicationForSessionsService,
        protected TaskdatesService: SessionTaskDatesService,
        protected router: Router,
        private userStore: UserStore)
    {}

    ngOnInit(): void {
        this.user = this.userStore.getUser();
        this.joined$ = this.applicationService.list(1, 10, undefined, this.user.id)
            .pipe(
                switchMap(applications => {
                    const courseSessionIds = uniq(applications.map(application => application.courseSessionId))

                    return combineLatest(
                        of(applications),
                        combineLatest(
                            courseSessionIds.map(courseSessionId => this.sessionService.get(courseSessionId).pipe(
                                switchMap(session => {
                                    const courseId = session.courseId
                                    return combineLatest(
                                        of(session),
                                        combineLatest(
                                            this.courseService.get(courseId).pipe(
                                                switchMap(course => {
                                                    const fieldId = course.fieldId

                                                    return combineLatest(
                                                        of(course),
                                                        combineLatest(
                                                            this.fieldService.get(fieldId).pipe(
                                                                map (field => field)
                                                            )
                                                        )
                                                    )
                                                }),
                                                map(([course, field]) => {
                                                    return {...course, field: field.find(f => f.id == course.fieldId)}
                                                })
                                            )
                                        ),
                                                                         
                                    )
                                }),
                                map(([session, course]) =>  {
                                    return {
                                        ...session, 
                                        course: course.find(c => c.id === session.courseId)
                                    }
                                }),
                                switchMap( session => {
                                    const sessionId = session.id;

                                    return combineLatest(
                                        of(session),
                                        combineLatest(
                                            this.TaskdatesService.getBySessionId(sessionId).pipe(
                                                switchMap(dates => {
                                                    const taskDatesIds = uniq(dates.map(dt => dt.taskId));

                                                    return combineLatest(
                                                        of(dates),
                                                        combineLatest(
                                                            taskDatesIds.map(taskDateId => this.tasksService.get(taskDateId).pipe(
                                                                map(task => task)
                                                            ))
                                                        )
                                                    )
                                                }),
                                                map(([dates, task]) => {
                                                    return dates.map(date => {
                                                        return {...date, task: task.find(t => t.id === date.taskId)}
                                                    })
                                                
                                                })
                                            )
                                        )
                                    )
                                }),
                                map(([session, dates]) => {
                                    return {
                                        ...session,
                                        dates: dates.map(date => date.find(d => d.sessionId === session.id))
                                    }
                                })
                            ))
                        )
                    )
                }),
                map(([applications, session]) => {
                    return applications.map(app => {
                        return {
                            ...app,
                            session: session.find(s => s.id === app.courseSessionId)
                        }
                    })
                })
            );
    }
}

这里是 HTML 模板文件

    <ng-container *ngIf="joined$ | async; else loading; let joined">

    <div *ngFor="let application of joined">
        <div class="current-application">
            <nb-card>
                <nb-card-header>
                    <h4>{{application.session.course.name}}</h4>
                    <small><i>{{application.session.course.field.name}}</i></small>
                </nb-card-header>
                <nb-card-body>
                    <p><b>id: </b>{{application.id}}</p>
                    <p><b>applicationDate: </b>{{application.applicationDate}}</p>
                    <p><b>acceptedDate: </b>{{application.acceptedDate}}</p>
                    <hr>
                    <p><b>session Id: </b>{{application.session.id}}</p>
                    <p><b>session capacity: </b>{{application.session.capacity}}</p>
                    <p><b>session startDate: </b>{{application.session.startDate}}</p>
                    <p><b>session endDate: </b>{{application.session.endDate}}</p>
                    <hr>
                    <p><b>Course Id: </b>{{application.session.course.id}}</p>
                    <p><b>Course Id: </b>{{application.session.course.id}}</p>
                </nb-card-body>
            </nb-card>
        </div>
    </div>

</ng-container>

<ng-template #loading>
    <p>Loding ...</p>
</ng-template>

编辑: 调试后我发现当 Dates array 为空时会发生错误,因此解决方案包括应用测试日期数组的长度。问题是当我尝试设置条件时出现以下错误

Argument of type '(dates: SessionTaskDate[]) => void' is not assignable to parameter of type '(value: SessionTaskDate[], index: number) => ObservableInput'. Type 'void' is not assignable to type 'ObservableInput'.

在以下情况下触发:

switchMap(dates =>{
        if(dates.length > 0){
            dates.map(date => this.augmentDateWithTask(date))
        }
    })

这是很多异步嵌套,也许还有其他方法可以解决这个问题?

如果您尝试获取多个可观察对象并进行单个订阅(例如,能够以类似的方式对多个事件做出反应),则使用 rxjs merge。

如果您想进行大量单独订阅,例如订阅用户、foo 和 bar,然后将它们的所有值用于某些逻辑,请使用 rxjs forkJoin。

如果您表述清楚objective,提供指导会更容易,但我知道 rxjs 运算符嵌套级别不容易维护。

如果我没理解错的话,你有一系列 应用程序 this.applicationService.list 返回的 Observable 通知。

然后每个 application 都有一个 courseSessionId,您可以使用它通过 this.sessionService.get 方法获取课程会话详细信息。

然后每个 session 都有一个 courseId,您可以使用它来获取 course 详细信息this.courseService.get.

然后每个 course 都有一个 fieldId,您可以使用它来获取 field 详细信息this.fieldService.get.

现在您应该有一个 session 数组,其中还包含他们所引用的 course 的所有详细信息。

然后,对于每个 session,您需要通过 this.TaskdatesService.getBySessionId.

获取 dates

Dates 似乎是包含 taskDatesId 的对象。您收集所有 taskDatesIds 并通过 this.tasksService.get.

获取 task 详细信息

现在您拥有所有 日期 和所有 任务 ,对于您创建的每个 日期 具有 date 属性及其相关 task.

的新对象

然后你回到 session 你创建了一个新对象,其中包含所有 session 属性及其相关的 dates .

现在您有一个包含所有 session 详细信息、它所引用的 course 的所有详细信息以及 日期它有。

最后一步是为每个 application 创建一个新对象,其中包含 application 的所有属性以及“增强”会话 刚刚创建的对象。

很有逻辑。

所以,再一次,如果这是正确的理解,我会从最内在的要求向外解决问题。

第一个内部请求是从 course returns 一个 Observable 开始的,它发出一个包含所有 course[=151] 的对象=] 属性和 字段 详细信息(我们称此对象为 augmentedCourse)。这可以通过这样的方法来执行

augmentCourseWithField(course) {
  return this.fieldService.get(course.fieldId).pipe(
    map (field => {
      return {...course, field}
    })
  )
}

然后我会向外迈出一步,并创建一个方法,从 courseId 开始,returns 一个发出 augmentedCourse 的 Observable ,即包含所有 课程 字段 详细信息的对象。

fetchAugmentedCourse(courseId) {
  return this.courseService.get(courseId).pipe(
     switchMap(course => this.augmentCourseWithField(course))
  )
}

现在让我们更进一步,创建一个方法,从 session 开始,returns 一个具有 所有属性的对象]session加上augmentedCourse的属性,session指的是。我们称这个对象为 augmentedSession.

augmentSessionWithCourse(session) {
  return this.fetchAugmentedCourse(session.courseId).pipe(
    map(course => {
      return {...session, course}
    })
  )
}

现在又向外迈进了一步。我们想从 courseSessionId.

开始获取 augmentedSession
fetchAugmentedSession(courseSessionId) {
  return this.sessionService.get(courseSessionId).pipe(
     switchMap(session => this.augmentSessionWithCourse(session))
  )
}

那么,到目前为止我们取得了什么成就?我们能够创建一个 Observable,它从 courseSessionId.

开始发出 augmentedSession

尽管我们有一个 applications 的列表,但在最外层,每个应用程序都包含一个 courseSessionId。不同的 applications 可以共享相同的 course,因此具有相同的 courseSessionId。因此,获取所有 applications,创建唯一 courseSessionIds 列表,使用该列表获取所有 courses 然后分配给每个 应用程序 它的 课程 。这样我们就避免了对同一门课程多次查询后端。

可以这样实现

fetchAugmentedApplications(user) {
  return this.applicationService.list(1, 10, undefined, user.id).pipe(
    switchMap(applications => {
      const courseSessionIds = uniq(applications.map(application => application.courseSessionId));
      // create an array of Observables, each one will emit an augmentedSession
      const augSessObs = courseSessionIds.map(sId => fetchAugmentedSession(sId))
      // we can use combineLatest instead of forkJoin, but I prefer forkJoin
      return forkJoin(augSessObs).pipe(
        map(augmentedSessions => {
          return applications.map(app => {
            const appSession = augmentedSessions.find(s => s.id === app.courseSessionId);
            return {...app, session: appSession}
          });
        })
      )
    })
  )
}

使用类似的样式,您应该还可以将 日期 详细信息添加到您的 会话 。同样在这种情况下,我们从最内层的操作开始,即通过 his.tasksService.get 在给定 taskIds 数组的情况下检索 tasks 数组。

代码如下所示

fetchTasks(taskIds) {
   taskObs = taskIds.map(tId => this.tasksService.get(tId));
   return forkJoin(taskObs)
}

向外移动一步,我们可以用任务[=151]扩充日期数组的每个日期 =] 属于 日期 像这样

augmentDates(dates) {
  if (dates.length === 0) {
    return of([]);
  }
  const taskDatesIds = uniq(dates.map(dt => dt.taskId));
  return fetchTasks(taskDatesIds).pipe(
    map(tasks => {
      return dates.map(date => {
        return {...date, task: task.find(t => t.id === date.taskId)}
      })
    })
  )
}

现在我们可以像这样会话增加日期

augmentSessionWithDates(session) {
  return this.TaskdatesService.getBySessionId(session.id).pipe(
    switchMap(dates => augmentDates(dates).pipe( 
      map(augmentedDates => {
        return {...session, dates: augmentedDates};
      })
    ))
  )
}

我们现在可以完成 fetchAugmentedSession 方法,同时添加使用 dates 信息扩充 session 的部分,如下所示

fetchAugmentedSession(courseSessionId) {
  return this.sessionService.get(courseSessionId).pipe(
     switchMap(session => this.augmentSessionWithCourse(session)),
     switchMap(session => this.augmentSessionWithDates(session)),
  )
}

通过这种方式,您将逻辑分成更小的块,这些块更容易测试,并且希望更容易阅读。

我没有任何 playground 来测试代码,所以很可能其中有拼写错误。我希望逻辑足够清晰。