RxJS / Angular 7 : forkjoin 与不同的回调
RxJS / Angular 7 : forkjoin with different callbacks
我对 rxjs 有点陌生(我们已经坚持使用 angularJS 很长时间了,终于摆脱了我们的技术债务)。
使用angular7,我有一个这样的请求列表。使用 forkJoin,我这样管理它:
let requests = []
for (let asset in this.assets)
{
if(asset.enabled)
requests.push(this.apiService.postAsset(oAsset))
}
return forkJoin(requests)
它运行良好,我收到了一系列回复。
然而 我在尝试弄清楚如何根据资产类型获得不同的回调时有点卡住了。
我想要
private saveAssets(): Observable<any>{
let requests = []
for (let asset in this.assets)
{
//asset exists already
if(asset.id)
{
requests.push(this.apiService.putAsset(asset)
.subscribe(response => {
// do something with asset
}))
}
else
{
requests.push(this.apiService.postAsset(asset)
.subscribe(response => {
asset.id = response.id
// do some more thing with asset
}))
}
}
return forkJoin(requests)
//keep the forkjoin because I want to do something when everything is done
}
但这行不通。给出错误
ERROR TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
与调用执行请求的函数有关,看起来像这样(它在没有单个 'subscribes' 的情况下工作正常):
this.saveAssets()
.subscribe(responses => {
console.log(responses)
})
任何帮助或关于要寻找的东西的提示,将不胜感激......我会分配更多时间学习 RxJS,但现在我只是想让这个小东西在一个不-太脏了。
非常感谢!
订阅 observable 将 return 订阅。但是 forkjoin 将期望可观察的数组。在您的情况下,这将是一系列订阅。对每个请求使用 map 运算符,如下所示,以解决您的问题。
let requests = []
for (let asset in this.assets)
{
//asset exists already
if(asset.id)
{
requests.push(this.apiService.putAsset(asset)
.pipe(map(response => {
// do something with asset or alter the response from here
return response;
})
));
}
else
{
requests.push(this.apiService.postAsset(asset)
.pipe(map(response => {
// do something with asset or alter the response from here
return response;
})));
}
}
return forkJoin(requests).
The Map operator applies a function of your choosing to each item
emitted by the source Observable, and returns an Observable that emits
the results of these function applications.
如果您不想改变最终结果,请使用 tap 运算符。
我对 rxjs 有点陌生(我们已经坚持使用 angularJS 很长时间了,终于摆脱了我们的技术债务)。
使用angular7,我有一个这样的请求列表。使用 forkJoin,我这样管理它:
let requests = []
for (let asset in this.assets)
{
if(asset.enabled)
requests.push(this.apiService.postAsset(oAsset))
}
return forkJoin(requests)
它运行良好,我收到了一系列回复。 然而 我在尝试弄清楚如何根据资产类型获得不同的回调时有点卡住了。 我想要
private saveAssets(): Observable<any>{
let requests = []
for (let asset in this.assets)
{
//asset exists already
if(asset.id)
{
requests.push(this.apiService.putAsset(asset)
.subscribe(response => {
// do something with asset
}))
}
else
{
requests.push(this.apiService.postAsset(asset)
.subscribe(response => {
asset.id = response.id
// do some more thing with asset
}))
}
}
return forkJoin(requests)
//keep the forkjoin because I want to do something when everything is done
}
但这行不通。给出错误
ERROR TypeError: You provided an invalid object where a stream was expected. You can provide an Observable, Promise, Array, or Iterable.
与调用执行请求的函数有关,看起来像这样(它在没有单个 'subscribes' 的情况下工作正常):
this.saveAssets()
.subscribe(responses => {
console.log(responses)
})
任何帮助或关于要寻找的东西的提示,将不胜感激......我会分配更多时间学习 RxJS,但现在我只是想让这个小东西在一个不-太脏了。
非常感谢!
订阅 observable 将 return 订阅。但是 forkjoin 将期望可观察的数组。在您的情况下,这将是一系列订阅。对每个请求使用 map 运算符,如下所示,以解决您的问题。
let requests = []
for (let asset in this.assets)
{
//asset exists already
if(asset.id)
{
requests.push(this.apiService.putAsset(asset)
.pipe(map(response => {
// do something with asset or alter the response from here
return response;
})
));
}
else
{
requests.push(this.apiService.postAsset(asset)
.pipe(map(response => {
// do something with asset or alter the response from here
return response;
})));
}
}
return forkJoin(requests).
The Map operator applies a function of your choosing to each item emitted by the source Observable, and returns an Observable that emits the results of these function applications.
如果您不想改变最终结果,请使用 tap 运算符。