RxJS / Angular 7 : forkjoin 与不同的回调

RxJS / Angular 7 : forkjoin with different callbacks

我对 rxjs 有点陌生(我们已经坚持使用 angularJS 很长时间了,终于摆脱了我们的技术债务)。

使用angular7,我有一个这样的请求列表。使用 forkJoin,我这样管理它:

let requests = []
for (let asset in this.assets)
{
    if(asset.enabled)
        requests.push(this.apiService.postAsset(oAsset))
}
return forkJoin(requests)

它运行良好,我收到了一系列回复。 然而 我在尝试弄清楚如何根据资产类型获得不同的回调时有点卡住了。 我想要

private saveAssets(): Observable<any>{
    let requests = []
    for (let asset in this.assets)
    {
      //asset exists already
      if(asset.id)
      {
        requests.push(this.apiService.putAsset(asset)
          .subscribe(response => {
           // do something with asset
          }))
      }
      else
      {
        requests.push(this.apiService.postAsset(asset)
          .subscribe(response => {  
            asset.id = response.id
            // do some more thing with asset
          }))
      }
    }
    return forkJoin(requests)
    //keep the forkjoin because I want to do something when everything is done
}

但这行不通。给出错误 ERROR TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable. 与调用执行请求的函数有关,看起来像这样(它在没有单个 'subscribes' 的情况下工作正常):

this.saveAssets()
    .subscribe(responses => {
        console.log(responses)
    })

任何帮助或关于要寻找的东西的提示,将不胜感激......我会分配更多时间学习 RxJS,但现在我只是想让这个小东西在一个不-太脏了。

非常感谢!

订阅 observable 将 return 订阅。但是 forkjoin 将期望可观察的数组。在您的情况下,这将是一系列订阅。对每个请求使用 map 运算符,如下所示,以解决您的问题。

let requests = []
for (let asset in this.assets)
{
  //asset exists already
  if(asset.id)
  {
    requests.push(this.apiService.putAsset(asset)
      .pipe(map(response => {
           // do something with asset or alter the response from here
          return response; 
         })
        ));
  }
  else
  {
    requests.push(this.apiService.postAsset(asset)
       .pipe(map(response => {
         // do something with asset or alter the response from here
         return response; 
      })));
  }
}
return forkJoin(requests).

The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.

如果您不想改变最终结果,请使用 tap 运算符。