需要帮助在 for 循环中执行顺序 api 调用
Need help to execute sequential api calls in for-loop
我正在使用 Angular 和 NodeJs(带有 Axios)使用 RxJs,目前发现完成这项任务非常具有挑战性。我会根据场景解释我的问题。
我有一个这样的对象数组,可能有超过 100 个对象:
let objArr = [{name: 'john', id: '123', country: 'usa'}, {name: 'doe', id: '456', country: 'china'}....]
然后我有另一个 4-5 验证 APIs 可以调用不同的参数,例如基于每个对象的 ID、名称和国家/地区:
api_1 = "validate_step_1/:id"
api_2 = "validate_step_2/:id/:name"
api_3 = "validate_step_3/:id/:country"
api_4 = "validate_step_4/:id:/:name/:country"
这些 API 调用应该严格按照顺序模式一个接一个地发生,例如api_2 只应在 api_1 returns 为真等情况下调用。
我想要的:
我想在应该 运行 并行的数组上执行 for 循环,然后每个对象应该根据这 4 个 API 调用顺序验证自己。对于所有 100 个对象,基于 for 循环中的每个项目的顺序 API 调用是并行的。
这可能吗?此外,也欢迎任何在节点端实现此目的的解决方案。
我试过的
目前我正在使用这种方法,但是速度很慢,甚至有时会导致axios超时错误:
of(...this.objArr).pipe(
concatMap((obj: any) => this.service.api_1(id)),
concatMap((obj: any) => this.service.api_2(id, name)),
concatMap((obj: any) => this.service.api_3(id, country)),
concatMap((obj: any) => this.service.api_4(id, name, country)),
catchError( error => this.handleError(error))
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
)
将您的数据数组切换到连接在一起的平行管道流中。
const dataArray: myData[];
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = arrayOfObjects.map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
return concat(test1, test2, test3, test4).pipe(
catchError(error => this.handleError(error))
)
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(nParallelPipes).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
nParallelPipes
包含 n 个同步顺序验证管道,其中 n 是初始数组中的数据量。 ForkJoining them together (or combineLatest) will fire them off the parallel.
希望对您有所帮助。
编码愉快。
Is it possible to apply certain conditional checks on the response of API before moving to the next?
是的。有可能。
const dataArray: string[];
const checkTest1 = (value) => true;
const checkTest2 = (value) => true;
const checkTest3 = (value) => true;
const checkTest4 = (value) => true;
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = (arrayOfObjects as string[]).map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
const startTesting = concatMap(value => iif(() => checkTest1(value), test1Passed, throwError('T1 Failed')))
const test1Passed = test2.pipe(concatMap(value => iif(() => checkTest2(value), test2Passed, throwError('T2 Failed'))));
const test2Passed = test3.pipe(concatMap(value => iif(() => checkTest3(value), test3Passed, throwError('T3 Failed'))));
const test3Passed = test4.pipe(concatMap(value => iif(() => checkTest4(value), of('success'), throwError('T4 Failed'))))
return test1.pipe(startTesting);
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(parallelCalls).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
你可以使用 forkJoin
运算符来调用多个端点,以防你只对所有端点的最终响应感兴趣,而不是未来的发射,它需要一个可观察数组并给你一个数组响应然后完成...像这样:
const obs1$ = this.service.api1;
const obs2$ = this.service.api2;
const subscription = forkJoin([obs1$,obs2$].subscribe(([response1, response2]) => console.log(response1, response2))
但如果您期望一个或多个可观察量产生更多排放,您可以使用 combineLatest
或最好使用 zip
运算符。
我会采用这种方法。
首先创建一个构建 Observable 的函数,该函数 运行 并行进行所有验证并处理我们可能遇到的任何错误(稍后会详细介绍错误)。像这样
function validate(id, name, country) {
return concat(
this.service.api1(id),
this.service.api2(id, name),
this.service.api3(id, country),
this.service.api4(id, name, country)
).pipe(
catchError(err => of(err.message))
);
}
然后我将使用 rxjs 库中的 from
函数将对象数组转换为流,然后将 mergeMap
运算符应用于并行启动所有验证,像这样
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country)))
.subscribe((v) => console.log(v));
为什么在这种情况下使用 mergeMap
而不是 forkJoin
。主要原因是使用 mergeMap
你可以控制你想要的并发级别。如果您不指定任何内容,则所有验证 运行 并行进行。但是,例如,如果你想限制并行验证的数量,你可以像这样使用 mergeMap
的可选 concurrent
参数
const concurrent = 3 // whatever limit
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country), concurrent))
.subscribe((v) => console.log(v));
如果您只想在满足特定条件时继续进行验证,您可以简单地使用 tap
运算符,如果条件不满足则 throw
出错。该错误将被我们在 validate
函数中 pipe
末尾添加的 catchError
运算符捕获。所以,验证 API 看起来像这样
api3(id, country) {
return this.invokeApi3(id, country).pipe(
tap(() => {
if (// check the criteria for which you do not want to continue) {
throw new Error('Error in API 3');
}
})
);
}
您可以查看 this stackblitz 以获得 examp.le
我正在使用 Angular 和 NodeJs(带有 Axios)使用 RxJs,目前发现完成这项任务非常具有挑战性。我会根据场景解释我的问题。
我有一个这样的对象数组,可能有超过 100 个对象:
let objArr = [{name: 'john', id: '123', country: 'usa'}, {name: 'doe', id: '456', country: 'china'}....]
然后我有另一个 4-5 验证 APIs 可以调用不同的参数,例如基于每个对象的 ID、名称和国家/地区:
api_1 = "validate_step_1/:id"
api_2 = "validate_step_2/:id/:name"
api_3 = "validate_step_3/:id/:country"
api_4 = "validate_step_4/:id:/:name/:country"
这些 API 调用应该严格按照顺序模式一个接一个地发生,例如api_2 只应在 api_1 returns 为真等情况下调用。
我想要的:
我想在应该 运行 并行的数组上执行 for 循环,然后每个对象应该根据这 4 个 API 调用顺序验证自己。对于所有 100 个对象,基于 for 循环中的每个项目的顺序 API 调用是并行的。
这可能吗?此外,也欢迎任何在节点端实现此目的的解决方案。
我试过的
目前我正在使用这种方法,但是速度很慢,甚至有时会导致axios超时错误:
of(...this.objArr).pipe(
concatMap((obj: any) => this.service.api_1(id)),
concatMap((obj: any) => this.service.api_2(id, name)),
concatMap((obj: any) => this.service.api_3(id, country)),
concatMap((obj: any) => this.service.api_4(id, name, country)),
catchError( error => this.handleError(error))
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
)
将您的数据数组切换到连接在一起的平行管道流中。
const dataArray: myData[];
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = arrayOfObjects.map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
return concat(test1, test2, test3, test4).pipe(
catchError(error => this.handleError(error))
)
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(nParallelPipes).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
nParallelPipes
包含 n 个同步顺序验证管道,其中 n 是初始数组中的数据量。 ForkJoining them together (or combineLatest) will fire them off the parallel.
希望对您有所帮助。
编码愉快。
Is it possible to apply certain conditional checks on the response of API before moving to the next?
是的。有可能。
const dataArray: string[];
const checkTest1 = (value) => true;
const checkTest2 = (value) => true;
const checkTest3 = (value) => true;
const checkTest4 = (value) => true;
of(dataArray).pipe(
// switch and listen to the returned pipe
switchMap(arrayOfObjects => {
// a normal map (not an RxJS map), returns an array of pipes
const nParallelPipes = (arrayOfObjects as string[]).map(data => {
const test1 = this.service.api_1(data.id);
const test2 = this.service.api_2(data.id, data.name);
const test3 = this.service.api_3(data.id, data.country);
const test4 = this.service.api_4(data.id, data.name, data.country);
const startTesting = concatMap(value => iif(() => checkTest1(value), test1Passed, throwError('T1 Failed')))
const test1Passed = test2.pipe(concatMap(value => iif(() => checkTest2(value), test2Passed, throwError('T2 Failed'))));
const test2Passed = test3.pipe(concatMap(value => iif(() => checkTest3(value), test3Passed, throwError('T3 Failed'))));
const test3Passed = test4.pipe(concatMap(value => iif(() => checkTest4(value), of('success'), throwError('T4 Failed'))))
return test1.pipe(startTesting);
});
// listen to all pipes in parallel, and only return when all pipes have completed
return forkJoin(nParallelPipes) // or combineLatest(parallelCalls).pipe(take(1))
})
).subscribe(
success => {
console.log("validation succeed", success);
},
errorData => {
console.log("validation failure: ", errorData);
}
);
你可以使用 forkJoin
运算符来调用多个端点,以防你只对所有端点的最终响应感兴趣,而不是未来的发射,它需要一个可观察数组并给你一个数组响应然后完成...像这样:
const obs1$ = this.service.api1;
const obs2$ = this.service.api2;
const subscription = forkJoin([obs1$,obs2$].subscribe(([response1, response2]) => console.log(response1, response2))
但如果您期望一个或多个可观察量产生更多排放,您可以使用 combineLatest
或最好使用 zip
运算符。
我会采用这种方法。
首先创建一个构建 Observable 的函数,该函数 运行 并行进行所有验证并处理我们可能遇到的任何错误(稍后会详细介绍错误)。像这样
function validate(id, name, country) {
return concat(
this.service.api1(id),
this.service.api2(id, name),
this.service.api3(id, country),
this.service.api4(id, name, country)
).pipe(
catchError(err => of(err.message))
);
}
然后我将使用 rxjs 库中的 from
函数将对象数组转换为流,然后将 mergeMap
运算符应用于并行启动所有验证,像这样
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country)))
.subscribe((v) => console.log(v));
为什么在这种情况下使用 mergeMap
而不是 forkJoin
。主要原因是使用 mergeMap
你可以控制你想要的并发级别。如果您不指定任何内容,则所有验证 运行 并行进行。但是,例如,如果你想限制并行验证的数量,你可以像这样使用 mergeMap
的可选 concurrent
参数
const concurrent = 3 // whatever limit
from(objArr)
.pipe(mergeMap((obj) => validate(obj.id, obj.name, obj.country), concurrent))
.subscribe((v) => console.log(v));
如果您只想在满足特定条件时继续进行验证,您可以简单地使用 tap
运算符,如果条件不满足则 throw
出错。该错误将被我们在 validate
函数中 pipe
末尾添加的 catchError
运算符捕获。所以,验证 API 看起来像这样
api3(id, country) {
return this.invokeApi3(id, country).pipe(
tap(() => {
if (// check the criteria for which you do not want to continue) {
throw new Error('Error in API 3');
}
})
);
}
您可以查看 this stackblitz 以获得 examp.le