使用项目反应器处理多个外部服务

Handle multiple external services using project reactor

我是反应式编程的新手,并尝试使用项目反应器模拟以下用例,但我发现将响应从一个服务调用传递到另一个依赖服务有点困难。任何建议或参考将不胜感激。

响应getDetails(请求inputRequest){

    //Call two external services parallel based on the incoming request 
        Response1 = callExternalService1(inputRequest)
        Response2 = callExternalService2(inputRequest)


    //Call one external service based on the Response1 and Response2
        Response3 = callExternalService3(Response1,Response2);


    //Call three external service parallel based on the Response1, Response2 and Response3
        Response4 = callExternalService4(Response1,Response2,Response3);
        Response5 = callExternalService5(Response1,Response2,Response3);
        Response6 = callExternalService6(Response1,Response2,Response3);


    //Last service call
       Response finalResponse= callLastExternalService(Response4,Response5,Response6);
       return finalResponse;

}

我尝试了以下示例,它对一个服务调用有效,但无法将响应传递给其他相关服务调用。

更新的答案:

Mono<Response> getDetails(Request inputRequest){
    
    return Mono.just(inputRequest.getId())
                    .flatMap(id->{
                        DbResponse res = getDBCall(id).block();
                        if(res == null){
                            return Mono.error(new DBException(....));
                        }
                        return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
                    }).flatMap(response->{
                        Response extser1 = response.getT1();
                        Response extser2 = response.getT2();
                        //any exceptions?
                        return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
                    }).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
                    });
}

private Mono<DbResponse> getDBCall(String id) {
        return Mono.fromCallable(()->dbservice.get(id))
                .subscribeOn(Schedulers.boundedElastic());
}

问题:

  1. 如何在不使用块的情况下将 Mono 转换为 DbResponse 操作?
  2. 如果任何外部服务失败,如何构建 平面图中的失败响应和 return 返回?

如果你的服务 return 单声道响应(否则你必须转换它们),你可以使用 zip 进行并行调用:

    Mono.zip( callExternalService1( inputRequest ),
              callExternalService2( inputRequest ) )
        .flatMap( resp1AndResp2 -> this.callExternalService3( resp1AndResp2.getT1(),
                                                              resp1AndResp2.getT2() )
                                       .flatMap( response3 -> Mono.zip( callExternalService4( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ),
                                                                        callExternalService5( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ),
                                                                        callExternalService6( resp1AndResp2.getT1(),
                                                                                              resp1AndResp2.getT2(),
                                                                                              response3 ) )
                                                                  .flatMap( resp4AndResp5AndResp6 -> callLastExternalService( resp4AndResp5AndResp6.getT1(),
                                                                                                                              resp4AndResp5AndResp6.getT2(),
                                                                                                                              resp4AndResp5AndResp6.getT3() ) ) ) );

如果您有 n 个呼叫并且您想要步进(也就是说,如果您有所有呼叫的响应则继续前进),请使用 zip。例如:

Mono.zip(call1, call2)
    .flatMap(tuple2 -> {
         ResponseEntity<?> r1 = tuple2.getT1();    //response from call1
         ResponseEntity<?> r2 = tuple2.getT2();    //response from call2
         return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
     })
    .flatMap(tuple3 -> {
         //at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
         return Mono.zip(call4, call5, call6);   //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
     })
    .flatMap(tuple3 -> callLastService);

注意:如果是伪代码,不会马上编译

您可以扩展以上内容来回答您自己的问题。请注意,由于 call1call2 是独立的,您可以使用 subscribeOn(Schedulers.boundedElastic())

并行 运行 它们

编辑:回答两个后续问题:

  1. 无需使用 block() 进行订阅,因为 flatMap 会热切地订阅您的内部流。你可以这样做:

     Mono.just(inputRequest.getId())
         .flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
    

注意:如果可调用 returns 为空,则 Mono.callable(..) returns 为空流。这就是为什么 switchIfEmpty

  1. 您可以使用 onErrorResume 等运算符来提供后备流。参见: