RxJs Angular 多重同步 http webcalls
RxJs Angular multiply sync http webcalls
我必须进行多次同步网络调用,如果状态正常,一个接一个。
可怕,但有效的代码:
this.api.post1().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post2().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post3().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post4().subscribe((data: any) => {
if (data.status == 'OK') {
// Do something
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
我尝试使用 concatMap
const post1$ = this.api.post1();
const post2$ = this.api.post2();
const post3$ = this.api.post3();
const post4$ = this.api.post4();
from([post1$, post2$, post3$, post4$]).pipe(
concatMap(a => a)
).subscribe(res => console.log(res));
但我需要检查这里的每个答案是否正确。有什么好的解决办法吗?
如果其中任何错误如下所示,您可以从流中抛出一般错误。
const post1$ = this.api.post1();
const post2$ = this.api.post2();
const post3$ = this.api.post3();
const post4$ = this.api.post4();
concat([post1$, post2$, post3$, post4$]).pipe(
switchMap(data => {
if (data.status !== "OK") {
return throwError("Some error");
}
return of(data);
})
).subscribe(res => console.log(res));
或: 如果您需要了解每个可观察对象的具体信息,您可以使用 concat,但在到达 concat 之前通过每个端点传递值。
const handleResponse = (type: string) =>
(responseObs) => responseObs.pipe(
switchMap(data => {
if (data.status !== "OK") {
return throwError("Some error about " + type);
}
return of(data);
})
);
const post1$ = this.api.post1().pipe(handleResponse("Post 1"));
const post2$ = this.api.post2().pipe(handleResponse("Post 2"));
const post3$ = this.api.post3().pipe(handleResponse("Post 3"));
const post4$ = this.api.post4().pipe(handleResponse("Post 4"));
concat([post1$, post2$, post3$, post4$]).subscribe(res => console.log(res));
这是另一种方法:
import { concat, of } from 'rxjs';
import { delay } from 'rxjs/operators';
var post1$ = of({status: "OK", data: 1});
var post2$ = of({status: "OK", data: 2});
var post3$ = of({status: "!OK", data: 3});
var post4$ = of({status: "OK", data: 4});
var runSubscribe = (
data: {status: string, data: any},
chainStatus: {index: number, hasError: boolean}) => {
if( chainStatus.hasError){
return;
}
chainStatus.index++;
if (data.status !== 'OK') {
console.log('error: ', data.status, data.data);
chainStatus.hasError = true;
return;
}
processData(chainStatus.index, data.data);
}
var processData = (index, data) => {
// you should put your "Success" processing function based on index
// I just randomly delay to up to 1000 msec
const random = Math.random()*1000;
delay(random);
console.log(`${index} success:`, data, random);
}
const status = {
index: -1,
hasError: false
}
console.clear();
concat(post1$, post2$, post3$, post4$).subscribe(
data => runSubscribe(data, status)
);
它将在 3-rd observable 停止,因为它的状态不是 "Ok"。
我建议您使用 RxJS 的管道运算符,并沿着管道链处理错误。您可以使用 switchMap for the chaining of the requests, as well as using throwError 结束链并发出错误可观察值。
this.api.post1()
.pipe(
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post2());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post3());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post4());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
).subscribe(res => {
console.log(res);
// do the rest here
}, error => {
// handle error
})
我必须进行多次同步网络调用,如果状态正常,一个接一个。
可怕,但有效的代码:
this.api.post1().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post2().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post3().subscribe((data: any) => {
if (data.status== 'OK') {
this.api.post4().subscribe((data: any) => {
if (data.status == 'OK') {
// Do something
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
}else{
console.log('error: ' + data.status);
}
});
我尝试使用 concatMap
const post1$ = this.api.post1();
const post2$ = this.api.post2();
const post3$ = this.api.post3();
const post4$ = this.api.post4();
from([post1$, post2$, post3$, post4$]).pipe(
concatMap(a => a)
).subscribe(res => console.log(res));
但我需要检查这里的每个答案是否正确。有什么好的解决办法吗?
如果其中任何错误如下所示,您可以从流中抛出一般错误。
const post1$ = this.api.post1();
const post2$ = this.api.post2();
const post3$ = this.api.post3();
const post4$ = this.api.post4();
concat([post1$, post2$, post3$, post4$]).pipe(
switchMap(data => {
if (data.status !== "OK") {
return throwError("Some error");
}
return of(data);
})
).subscribe(res => console.log(res));
或: 如果您需要了解每个可观察对象的具体信息,您可以使用 concat,但在到达 concat 之前通过每个端点传递值。
const handleResponse = (type: string) =>
(responseObs) => responseObs.pipe(
switchMap(data => {
if (data.status !== "OK") {
return throwError("Some error about " + type);
}
return of(data);
})
);
const post1$ = this.api.post1().pipe(handleResponse("Post 1"));
const post2$ = this.api.post2().pipe(handleResponse("Post 2"));
const post3$ = this.api.post3().pipe(handleResponse("Post 3"));
const post4$ = this.api.post4().pipe(handleResponse("Post 4"));
concat([post1$, post2$, post3$, post4$]).subscribe(res => console.log(res));
这是另一种方法:
import { concat, of } from 'rxjs';
import { delay } from 'rxjs/operators';
var post1$ = of({status: "OK", data: 1});
var post2$ = of({status: "OK", data: 2});
var post3$ = of({status: "!OK", data: 3});
var post4$ = of({status: "OK", data: 4});
var runSubscribe = (
data: {status: string, data: any},
chainStatus: {index: number, hasError: boolean}) => {
if( chainStatus.hasError){
return;
}
chainStatus.index++;
if (data.status !== 'OK') {
console.log('error: ', data.status, data.data);
chainStatus.hasError = true;
return;
}
processData(chainStatus.index, data.data);
}
var processData = (index, data) => {
// you should put your "Success" processing function based on index
// I just randomly delay to up to 1000 msec
const random = Math.random()*1000;
delay(random);
console.log(`${index} success:`, data, random);
}
const status = {
index: -1,
hasError: false
}
console.clear();
concat(post1$, post2$, post3$, post4$).subscribe(
data => runSubscribe(data, status)
);
它将在 3-rd observable 停止,因为它的状态不是 "Ok"。
我建议您使用 RxJS 的管道运算符,并沿着管道链处理错误。您可以使用 switchMap for the chaining of the requests, as well as using throwError 结束链并发出错误可观察值。
this.api.post1()
.pipe(
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post2());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post3());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
switchMap(res => {
if (data.status === 'OK') {
return of(this.api.post4());
} else {
console.log('error: ' + data.status);
return throwError('Twos are bad');
}
}),
).subscribe(res => {
console.log(res);
// do the rest here
}, error => {
// handle error
})