使用项目反应器处理多个外部服务
Handle multiple external services using project reactor
我是反应式编程的新手,并尝试使用项目反应器模拟以下用例,但我发现将响应从一个服务调用传递到另一个依赖服务有点困难。任何建议或参考将不胜感激。
响应getDetails(请求inputRequest){
//Call two external services parallel based on the incoming request
Response1 = callExternalService1(inputRequest)
Response2 = callExternalService2(inputRequest)
//Call one external service based on the Response1 and Response2
Response3 = callExternalService3(Response1,Response2);
//Call three external service parallel based on the Response1, Response2 and Response3
Response4 = callExternalService4(Response1,Response2,Response3);
Response5 = callExternalService5(Response1,Response2,Response3);
Response6 = callExternalService6(Response1,Response2,Response3);
//Last service call
Response finalResponse= callLastExternalService(Response4,Response5,Response6);
return finalResponse;
}
我尝试了以下示例,它对一个服务调用有效,但无法将响应传递给其他相关服务调用。
更新的答案:
Mono<Response> getDetails(Request inputRequest){
return Mono.just(inputRequest.getId())
.flatMap(id->{
DbResponse res = getDBCall(id).block();
if(res == null){
return Mono.error(new DBException(....));
}
return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
}).flatMap(response->{
Response extser1 = response.getT1();
Response extser2 = response.getT2();
//any exceptions?
return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
}).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
});
}
private Mono<DbResponse> getDBCall(String id) {
return Mono.fromCallable(()->dbservice.get(id))
.subscribeOn(Schedulers.boundedElastic());
}
问题:
- 如何在不使用块的情况下将 Mono
转换为 DbResponse
操作?
- 如果任何外部服务失败,如何构建
平面图中的失败响应和 return 返回?
如果你的服务 return 单声道响应(否则你必须转换它们),你可以使用 zip 进行并行调用:
Mono.zip( callExternalService1( inputRequest ),
callExternalService2( inputRequest ) )
.flatMap( resp1AndResp2 -> this.callExternalService3( resp1AndResp2.getT1(),
resp1AndResp2.getT2() )
.flatMap( response3 -> Mono.zip( callExternalService4( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService5( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService6( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ) )
.flatMap( resp4AndResp5AndResp6 -> callLastExternalService( resp4AndResp5AndResp6.getT1(),
resp4AndResp5AndResp6.getT2(),
resp4AndResp5AndResp6.getT3() ) ) ) );
如果您有 n 个呼叫并且您想要步进(也就是说,如果您有所有呼叫的响应则继续前进),请使用 zip
。例如:
Mono.zip(call1, call2)
.flatMap(tuple2 -> {
ResponseEntity<?> r1 = tuple2.getT1(); //response from call1
ResponseEntity<?> r2 = tuple2.getT2(); //response from call2
return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
})
.flatMap(tuple3 -> {
//at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
return Mono.zip(call4, call5, call6); //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
})
.flatMap(tuple3 -> callLastService);
注意:如果是伪代码,不会马上编译
您可以扩展以上内容来回答您自己的问题。请注意,由于 call1
和 call2
是独立的,您可以使用 subscribeOn(Schedulers.boundedElastic())
并行 运行 它们
编辑:回答两个后续问题:
无需使用 block()
进行订阅,因为 flatMap
会热切地订阅您的内部流。你可以这样做:
Mono.just(inputRequest.getId())
.flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
注意:如果可调用 returns 为空,则 Mono.callable(..)
returns 为空流。这就是为什么 switchIfEmpty
- 您可以使用
onErrorResume
等运算符来提供后备流。参见:
我是反应式编程的新手,并尝试使用项目反应器模拟以下用例,但我发现将响应从一个服务调用传递到另一个依赖服务有点困难。任何建议或参考将不胜感激。
响应getDetails(请求inputRequest){
//Call two external services parallel based on the incoming request
Response1 = callExternalService1(inputRequest)
Response2 = callExternalService2(inputRequest)
//Call one external service based on the Response1 and Response2
Response3 = callExternalService3(Response1,Response2);
//Call three external service parallel based on the Response1, Response2 and Response3
Response4 = callExternalService4(Response1,Response2,Response3);
Response5 = callExternalService5(Response1,Response2,Response3);
Response6 = callExternalService6(Response1,Response2,Response3);
//Last service call
Response finalResponse= callLastExternalService(Response4,Response5,Response6);
return finalResponse;
}
我尝试了以下示例,它对一个服务调用有效,但无法将响应传递给其他相关服务调用。
更新的答案:
Mono<Response> getDetails(Request inputRequest){
return Mono.just(inputRequest.getId())
.flatMap(id->{
DbResponse res = getDBCall(id).block();
if(res == null){
return Mono.error(new DBException(....));
}
return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
}).flatMap(response->{
Response extser1 = response.getT1();
Response extser2 = response.getT2();
//any exceptions?
return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
}).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
});
}
private Mono<DbResponse> getDBCall(String id) {
return Mono.fromCallable(()->dbservice.get(id))
.subscribeOn(Schedulers.boundedElastic());
}
问题:
- 如何在不使用块的情况下将 Mono
转换为 DbResponse 操作? - 如果任何外部服务失败,如何构建 平面图中的失败响应和 return 返回?
如果你的服务 return 单声道响应(否则你必须转换它们),你可以使用 zip 进行并行调用:
Mono.zip( callExternalService1( inputRequest ),
callExternalService2( inputRequest ) )
.flatMap( resp1AndResp2 -> this.callExternalService3( resp1AndResp2.getT1(),
resp1AndResp2.getT2() )
.flatMap( response3 -> Mono.zip( callExternalService4( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService5( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ),
callExternalService6( resp1AndResp2.getT1(),
resp1AndResp2.getT2(),
response3 ) )
.flatMap( resp4AndResp5AndResp6 -> callLastExternalService( resp4AndResp5AndResp6.getT1(),
resp4AndResp5AndResp6.getT2(),
resp4AndResp5AndResp6.getT3() ) ) ) );
如果您有 n 个呼叫并且您想要步进(也就是说,如果您有所有呼叫的响应则继续前进),请使用 zip
。例如:
Mono.zip(call1, call2)
.flatMap(tuple2 -> {
ResponseEntity<?> r1 = tuple2.getT1(); //response from call1
ResponseEntity<?> r2 = tuple2.getT2(); //response from call2
return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
})
.flatMap(tuple3 -> {
//at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
return Mono.zip(call4, call5, call6); //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
})
.flatMap(tuple3 -> callLastService);
注意:如果是伪代码,不会马上编译
您可以扩展以上内容来回答您自己的问题。请注意,由于 call1
和 call2
是独立的,您可以使用 subscribeOn(Schedulers.boundedElastic())
编辑:回答两个后续问题:
无需使用
block()
进行订阅,因为flatMap
会热切地订阅您的内部流。你可以这样做:Mono.just(inputRequest.getId()) .flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
注意:如果可调用 returns 为空,则 Mono.callable(..)
returns 为空流。这就是为什么 switchIfEmpty
- 您可以使用
onErrorResume
等运算符来提供后备流。参见: