有什么方法可以使用 RxJS 将 n 个 http 请求链接在一起吗?

Is there any way to chain n amount of http requests together using RxJS?

在下面的代码中,我有一个 User 对象,它可以关联数量可变的帐户 ID。当用户更新时,应该为用户对象中包含的每个帐户发出额外请求:

class User {

  id: number;

  accountIds: Array<number>;
}

// Side effects class
@Effect()
update$ = this.actions$
  .ofType('USER_UPDATE')
  .switchMap(action => {

    let user = action.payload;

    for (let accountId of user.accountIds) {
      let account = { id: accountId, userIds: [user.id] };

      // Is there any way to chain these api calls with switch maps
      // and then finally switch map to the user request below?
      return this.apiService.post(`/accounts/${account.id}`, account);
    }

    return this.apiService.post(`/users/${userid}`, user);
  }

有没有什么方法可以使用 RxJS 通过 switchMap(或类似的东西)将这些调用链接在一起,以便在所有后续请求完成之前,observable 不会被认为是完整的 无需编写某种自定义递归逻辑?

我想你正在寻找 Observable.concat

From the docs:

concat joins multiple Observables together, by subscribing to them one at a time and merging their results into the output Observable. You can pass either an array of Observables, or put them directly as arguments. Passing an empty array will result in Observable that completes immediately.

concat will subscribe to first input Observable and emit all its values, without changing or affecting them in any way. When that Observable completes, it will subscribe to then next Observable passed and, again, emit its values. This will be repeated, until the operator runs out of Observables. When last input Observable completes, concat will complete as well. At any given moment only one Observable passed to operator emits values. If you would like to emit values from passed Observables concurrently, check out merge instead, especially with optional concurrent parameter. As a matter of fact, concat is an equivalent of merge operator with concurrent parameter set to 1.

试试这个:

update$ = this.actions$
    .ofType('USER_UPDATE')
    .pluck('payload')
    .switchMap(user =>      
        Observable.from(user.accountIds)
            .concatMap(accountId => 
                this.apiService.post(`/accounts/${accountId}`, {id: accountId, userIds: [user.id]})
            )
            .concat(Observable.defer(() => this.apiService.post(`/users/${user.id}`, user)))
    );

这是一个硬编码数据的例子: https://jsfiddle.net/96x907ae/1/

如果你只想用 Observable 来处理它(这似乎是一个效果中的好主意),你可以这样做:

const { Observable } = Rx;

// DATA
const user = {
  id: 'currentUserId',
  accountIds: [
    'accountId0',
    'accountId1',
    'accountId2',
    'accountId3'
  ]
}

// MOCK BACKEND CALLS
const apiService = {
  post: (url, data) => {
    return Observable.of(`some response for ${url}`).delay(1000);
  }
};

// HANDLE THE MULTIPLE HTTP CALLS
Observable
  .from(user.accountIds)
  .map(accountId => ({ id: accountId, userIds: [user.id] }))
  .map(account => apiService.post(`/accounts/${account.id}`, account))
  .concatAll()
  .do(console.log)
  .last()
  .do(_ => console.log('Finished to chain the HTTP calls for accounts'))
  .switchMap(_ => apiService.post(`/users/${user.id}`, user))
  .do(console.log)
  .subscribe();

输出为:

我制作了一个 Plunkr,您可以看到它的实际效果:
https://plnkr.co/edit/xFWXyAJM6qm0XwMf4tmv?p=preview


这里有趣的部分是我们如何处理调用。
让我为您提供更多详细信息:

from 允许我们将每个 accountIds 一个接一个地发送到 observable :

  .from(user.accountIds)

然后我们可以为每个 accountId 构建我们要发送到后端的对象:

  .map(accountId => ({ id: accountId, userIds: [user.id] }))

完成后,我们创建一个 cold Observable,一旦我们订阅它就会进行 HTTP 调用:

  .map(account => apiService.post(`/accounts/${account.id}`, account))

感谢 concatAll,我们拥有您所期望的行为:逐一进行每个 HTTP 调用:

  .concatAll()

显示 HTTP 调用的响应:

  .do(console.log)

那么现在,一旦对 accounts 的每个请求都完成,我们想向 users 发出最后一个请求。 last 将帮助我们等待最后一个(这里是 captain obvious): 。最后的() 只需记录对 accounts 的每个请求都已完成

  .do(_ => console.log('Finished to chain the HTTP calls for accounts'))

users 进行最终 HTTP 调用 .switchMap(_ => apiService.post(/users/${user.id}, 用户)) 输出响应

  .do(console.log)

当然我们需要订阅,因为这是一个冷的 Observable,否则什么也不会发生。 * .subscribe();

*:在您的效果中,您可能只想 return Observable,因为 ngrx/effects 会为您订阅它;)