使用 Webclient - Webflux 将两个 api 响应合并为一个

Merge two api responses into one using Webclient - Webflux

我正在使用 WebFlux 和 WebClient,我需要使用两个 API 并合并其响应。

第一个 API 接收类型和文档编号,return 是一个列表,其中一个元素包含客户数据(这就是它的定义方式)。

第二个 API 收到客户 ID 和 return 客户付款列表。

我需要使用这两个 API 和 return 具有客户数据及其付款的实体。

API 客户响应

public class CustomerResponseApi {
    private List<CustomerApi> clientList;
}
public class CustomerApi {
    private int customerId;
    private String documentNumber;
    private String documentType;
    private String firstName;
    private String lastName;
}

API 付款响应

public class PaymentResponseApi {
    private int customerId;
    private LocalDate paymentDate;
    private float amount;
    private String paymentType;
}

最后我应该有这个

CustomerResponse.java

public class CustomerResponse {
    private int customerId;
    private String documentNumber;
    private String documentType;
    private String firstName;
    private String lastName;
    
    private List<PaymentResponseApi> payments;
}

我有一个代理 class 负责进行 API 调用

CustomerProxy.java

public class CustomerProxy {

    @Value("${api.base-url}")
    private String baseUrl;

    public Mono<CustomerResponseApi> getCustomer(String documentType, String documentNumber) {
        log.info("baseUrl: {}", baseUrl);
        WebClient webClient = WebClient.create(baseUrl);

        return webClient.get()
                .uri(uri -> uri
                        .path("/customers")
                        .queryParam("documentNumber", documentNumber)
                        .queryParam("documentType", documentType)
                        .build()
                )
                .retrieve()
                .bodyToMono(CustomerResponseApi.class);
    }
}

PaymentProxy.java

public class PaymentProxy {

    @Value("${api.base-url}")
    private String baseUrl;

    public Flux<PaymentResponseApi> getCustomerPayment(int customerId) {
        log.info("baseUrl: {}", baseUrl);
        WebClient webClient = WebClient.create(baseUrl);

        return webClient.get()
                .uri(uri -> uri
                        .path("/payments")
                        .queryParam("customerId", customerId)
                        .build()
                )
                .retrieve()
                .bodyToFlux(PaymentResponseApi.class);
    }
}

以及负责合并响应的服务 CustomerServiceImpl.java

public class CustomerServiceImpl implements CustomerService {
    
    @Autowired
    private CustomerProxy customerProxy;
    
    @Autowired
    private PaymentProxy paymentProxy;

    @Override
    public Mono<CustomerResponse> getCustomerAndPayments(String documentType, String documentNumber) {
        return customerProxy.getCustomer(documentType, documentNumber).flatMap(resp -> {
            CustomerApi customerApi = resp.getClientList().get(0); //always returns one customer
            
            // Here is my problem, because getCustomerPayment method returns a Flux
            List<PaymentResponseApi> payments = paymentProxy.getCustomerPayment(customerApi.getCustomerId());
            
            CustomerResponseBuilder customerBuilder = CustomerResponse.builder()
                    .customerId(customerApi.getCustomerId())
                    .documentNumber(customerApi.getDocumentNumber())
                    .documentType(customerApi.getDocumentType())
                    .firstName(customerApi.getFirstName())
                    .lastName(customerApi.getLastName())
                    .payments(payments);
            
            return Mono.just(customerBuilder.build());
        });
    }
}

我该怎么办?

两种解决方法:

  1. 使用嵌套地图:
public Mono<CustomerResponse> getCustomerAndPayments(String documentType, String documentNumber) {

    return customerProxy.getCustomer(documentType, documentNumber)
        .map(resp -> resp.getClientList().get(0))
        .flatMap(customerApi -> {
          Flux<PaymentResponseApi> paymentProxyFlux = paymentProxy.getCustomerPayment(customerApi.getCustomerId());
          return paymentProxyFlux.collectList()
              .map(payments -> {
                CustomerResponseBuilder customerBuilder = CustomerResponse.builder()
                    .customerId(customerApi.getCustomerId())
                    .documentNumber(customerApi.getDocumentNumber())
                    .documentType(customerApi.getDocumentType())
                    .firstName(customerApi.getFirstName())
                    .lastName(customerApi.getLastName())
                    .payments(payments);
                return customerBuilder.build();
              });
        });


  }
  1. 使用 zip:由于您在第二个 API 中需要来自第一个 API 的响应的信息,因此您需要将这两个链接在一起。 现在由于它们是异步调用,因此您需要一个 flatMap 或称为 flatMapMany 的 flatMap 变体,它发出多个元素(这正是您的第二个 API 正在做的)。接下来,您需要两个响应来构建您的 CustomerResponse 即您需要 zip 来自两个 API 的两个反应流响应.

因此基本上使用 Method2 你需要这个:

  public Mono<CustomerResponse> getCustomerAndPayments(String documentType, String documentNumber) {


    Mono<CustomerApi> customerApiMono =  customerProxy.getCustomer(documentType, documentNumber)
        .map(resp -> resp.getClientList().get(0));
    Mono<List<PaymentResponseApi>> paymentResponseApiListMono = customerApiMono
        .flatMapMany(customerApi -> paymentProxy.getCustomerPayment(customerApi.getCustomerId()))
        .collectList();

    return customerApiMono.zipWith(paymentResponseApiListMono)
        .map(tuple -> {
          CustomerApi customerApi = tuple.getT1();
          List<PaymentResponseApi> payments = tuple.getT2();
          CustomerResponseBuilder customerBuilder = CustomerResponse.builder()
              .customerId(customerApi.getCustomerId())
              .documentNumber(customerApi.getDocumentNumber())
              .documentType(customerApi.getDocumentType())
              .firstName(customerApi.getFirstName())
              .lastName(customerApi.getLastName())
              .payments(payments);
          return customerBuilder.build();
        });

  }

方法 2 的缺点:Api1 即客户 API 将被订阅两次。

在这种情况下,您可以缓存第一次调用的结果以防止调用 API 两次。

有时,通过创建包装器 class 来不处理元组,使用 zip 运算符也更容易。在这种情况下 CustomerWithPayments :

public class CustomerWithPayments {
    private final CustomerApi customerApi;
    private final List<PaymentResponseApi> paymentResponseApis;
}

缓存 API 结果的解决方案:

public Mono<CustomerResponse> getCustomerAndPayments(String documentType, String documentNumber) {
    Mono<CustomerApi> customerMono = customerProxy.getCustomer(documentType, documentNumber)
            .map(resp -> resp.getClientList().get(0))
            .cache();
    Mono<List<PaymentResponseApi>> paymentResponseMono = customerMono.map(CustomerApi::getCustomerId)
            .flatMapMany(paymentProxy::getCustomerPayment)
            .collectList();

    return customerMono.zipWith(paymentResponseMono, CustomerWithPayments::new)
            .map(customerWithPayments -> {
                CustomerApi customer = customerWithPayments.getCustomerApi();
                List<PaymentResponseApi> payments = customerWithPayments.getPaymentResponseApis();
                return CustomerResponse.builder()
                        .customerId(customer.getCustomerId())
                        .documentNumber(customer.getDocumentNumber())
                        .documentType(customer.getDocumentType())
                        .firstName(customer.getFirstName())
                        .lastName(customer.getLastName())
                        .payments(payments)
                        .build();
            });
}