WebFlux 链接以调用多个服务和响应聚合

WebFlux chaining to call multiple services and response aggregation

我最近开始使用 WebFlux,需要有关如何链接多个服务和聚合响应的建议。 4个服务和它们的响应POJO类似于下面的例子:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

和4个服务的签名:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

因此需要为Flux中的每个Response1调用service2,为每个Response2调用service3。模型之间的关系是:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

汇总的最终响应应类似于 (JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

所以首先我需要调用 Service1,然后为每个 Response1 调用 service2,然后为每个 Response2(由 service2 返回)调用 service3。此外,为 service1 返回的每个 response1 调用 service4(可以与 service2 和 service3 调用并行调用)。为了更新聚合的最终响应,我添加了两个额外的 POJO 以允许存储子响应,例如(相关位):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

如何最好地进行链接以便我保留以前的响应数据并在组合运算符时保留 AggResponse 对象中的所有数据?我试过以下:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

但是,我收到以下不完整的回复:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

我看到日志时看到所有服务都被调出。两个问题: 1.为什么看不到聚合响应,可以减少丢失吗? 2. 有没有更好的方法来实现这个?

如果您不想等待 service2next 信号给您的 service4,您可以使用 merge 方法。像这样:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

实用程序类和方法:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

并且您应该非常小心异步环境中的可变集合。避免在反应管道内更改它。

使用 Flux.combineLatest() 看起来更简单一些。

 service1().flatMap(response1 -> Flux.combineLatest(
      service2Service3(response1.a1).collectList(), // call service2 which call service3
      service4(response1.a2),                       // call service4
      (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            )

这里是完整的源代码。

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;

@Log4j2
public class CombiningTests {
    @Data
    @AllArgsConstructor
    @ToString
    static class Response1{
        String a1;
        String a2;
    }

    @Data
    @AllArgsConstructor
    @ToString
    static class Response2{
        String b1;
        public Response2(){
        }
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response3{
        String c1;
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response4{
        String d1;
    }

    @Data
    @ToString(callSuper=true)
    static class AggResponse2 extends Response2{
        List<Response3> response3s;
        public AggResponse2(String b1, List<Response3> response3s) {
            super(b1);
            this.response3s = response3s;
        }
    }

    @Data
    @ToString
    @Builder
    static class FinalResponse {
        final String a1;
        final String a2;
        final String d1;
        final List<AggResponse2> response2s;
    }

    static Flux<Response1> service1(){
        //
        return Flux
                .just(new Response1("a1", "a2"))
                .delayElements(Duration.ofMillis(4));
    }
    static Flux<Response2> service2(String a1){
        //
        return Flux
                .just(new Response2("b1-" + a1), new Response2("b2-" + a1))
                .delayElements(Duration.ofMillis(3));
    }
    static Flux<Response3> service3(String b1){
        //
        return Flux
                .just(new Response3("c1-" + b1), new Response3("c2-" + b1))
                .delayElements(Duration.ofMillis(5));
    }
    static Mono<Response4> service4(String a2){
        //
        return Mono
                .just(new Response4("d1-" + a2))
                .delayElement(Duration.ofMillis(8));
    }

    static Flux<AggResponse2> service2Service3(String a1){
        return service2(a1)
                .flatMap(
                        x2->service3(x2.b1)
                                .collectList()
                                .map(x3->new AggResponse2(x2.b1, x3))
                );
    }
    /**
     * service1() 1 -----> * service2() 1 --> * service3()
     *                |--> 1 service4()
     */
    @Test
    void testComplexCombineLatest(){
        ObjectMapper objectMapper = new ObjectMapper();
        service1().flatMap(response1 -> Flux.combineLatest(
                service2Service3(response1.a1).collectList(), // call service2 which call service3
                service4(response1.a2),                       // call service4
                (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            ).doOnNext(e->{
                    try {
                        String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(e);
                        System.out.println("JSON = " + json);
                    } catch (Exception ex) {
                        //e.printStackTrace();
                    }
            }).blockLast();
    }
}

这里是json格式的执行结果:

JSON = {
  "a1" : "a1",
  "a2" : "a2",
  "d1" : "d1-a2",
  "response2s" : [ {
    "b1" : "b1-a1",
    "response3s" : [ {
      "c1" : "c1-b1-a1"
    }, {
      "c1" : "c2-b1-a1"
    } ]
  }, {
    "b1" : "b2-a1",
    "response3s" : [ {
      "c1" : "c1-b2-a1"
    }, {
      "c1" : "c2-b2-a1"
    } ]
  } ]
}