串联和并行组合可观察对象以从多个 API 获取数据

Combining observables in series & parallel to fetch data from multiple APIs

我正在尝试检查我用 Typescript 编写的函数的有效性,与 RxJS observables 一致,它从一项服务获取一些预订,然后为每个预订获取其相应的位置和 activity另一项服务。

我写这个post只是为了验证我所写内容的有效性,并询问我是否可以更有效地完成任何事情。

let params = new HttpParams();
params = params.append('status', 'C');
params = params.append('offset', offset.toString());
params = params.append('limit', limit.toString());
return this.http.get(`${this.environment.booking.url}/my/bookings`, { params }).pipe(
    mergeMap((bookings: Booking[]) => {
        if(bookings.length > 0) {
            return forkJoin(
                bookings.map((booking: Booking) =>
                    forkJoin(
                        of(booking),
                        this.activityService.getActivity(booking.activity),
                  this.locationService.getLocation(booking.finalLocation),
                    ).pipe(
                        map((data: [ Booking, Activity, Location ]) => {
                            let booking = data[0];
                            booking.activityData = data[1];
                            booking.finalLocationData = data[2];
                            return booking;
                        })
                    )
                )
            )
        }

        return of([]);
    }),
    catchError((err: HttpErrorResponse) => throwError(err))
);

我希望此功能能够 return 预订列表以及相应的位置和 activity。但更重要的是,我想验证我所做的是正确和明智的。有什么我可以做的不同的事情来使它更清晰/更易于阅读(请不要吹毛求疵)?

关于性能,我还有一个关于性能的后续问题。鉴于预订列表具有共同的活动和位置。有没有办法只获取活动和位置而没有任何重复的 HTTP 请求?这是否已经由 RxJS 进行了处理?我可以做些什么来提高这个功能的效率吗?

这就是我使用 RxJS 解决这个问题的方法:

  1. 获取所有 Bookings
  2. 每个预订同时获取 LocationActivities

const { from, of, forkJoin, identity } = rxjs;
const { mergeMap, tap, catchError } = rxjs.operators;

const api = 'https://jsonplaceholder.typicode.com';

const endpoints = {
  bookings: () => `${api}/posts`,
  locations: (id) => `${api}/posts/${id}/comments`,
  activities: (id) => `${api}/users/${id}`
};

const fetch$ = link => from(fetch(link)).pipe(
  mergeMap(res => res.json()),
  catchError(() => from([])),
);

fetch$(endpoints.bookings()).pipe(
  mergeMap(identity),
  mergeMap(booking => forkJoin({
    booking: of(booking),
    locations: fetch$(endpoints.locations(booking.id)),
    activities: fetch$(endpoints.activities(booking.userId)),
  })),
).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.js" integrity="sha256-Nihli32xEO2dsnrW29M+krVxoeDblkRBTkk5ZLQJ6O8=" crossorigin="anonymous"></script>


注意:

  1. 反应式编程,以及更一般的声明性方法,着重于避免命令式控制流……您应该尝试在没有条件(或任何其他控制流)的情况下编写您的管道。要放弃空预订,您可以使用 filter 运算符。
  2. 避免嵌套流,因为这是以可读性为代价的。
  3. forkJoin 还采用了一个非常有用的规范对象(重载的一部分)

我不确定效率如何,但至少对我来说,它有点难读

这是我的做法:

我用了一个假人API,但我认为它与你的情况相关

const usersUrl = 'https://jsonplaceholder.typicode.com/users';
const todosUrl = 'https://jsonplaceholder.typicode.com/todos';
const userIds$ = of([1, 2, 3]); // Bookings' equivalent

userIds$
  .pipe(
    filter(ids => ids.length !== 0),
    // Flatten the array so we can avoid another nesting level
    mergeMap(ids => from(ids)),
    // `concatMap` - the order matters!
    concatMap(
      id => forkJoin(ajax(`${usersUrl}/${id}`), ajax(`${todosUrl}/${id}`))
        .pipe(
          map(([user, todo]) => ({ id, user: user.response, todo: todo.response }))
        )
    ),
   toArray()
  )
  .subscribe(console.log)

Here is a StackBlitz demo.

考虑到这一点,我将根据您的问题调整它:

this.http.get(`${this.environment.booking.url}/my/bookings`, { params }).pipe(
    filter(bookings => bookings.length !== 0),
    // Get each booking individually
    mergeMap(bookings => from(bookings)),
    concatMap(
        b => forkJoin(
            this.activityService.getActivity(b.activity),
            this.locationService.getLocation(b.finalLocation),
        )
        .pipe(
            map(([activity, location]) => ({ ...b, activity, location }))
        )
    ),
    // Getting the bookings array again
    toArray()
    catchError((err: HttpErrorResponse) => throwError(err))
);