我有一组复杂的任务使用 Java Web 客户端请求,这些请求需要并行 运行 并最终阻塞到 return 单个响应

I have a complicated set of tasks using Java Web-Client requests that need to run in parallel and finally block to return a single response

我是 Web 客户端响应式库的新手。

这是我的问题:

它从用户向 post 一包文件提交请求开始。他们等待回应。

使用此请求的服务需要运行 并行处理多个任务。每个任务中的一些子任务必须先完成(2 个不同的 Get 请求),然后才能尝试最后一个子任务,即主要 Post 请求。然后我想等待所有任务的收集'Post sub-tasks'完成(代表数据包),然后收集并协调响应。

我需要在最后协调并确保整个并行过程成功(发送数据包),然后响应服务器(用户)指示该过程是否成功。

我的伪流程:

最后评估从 Post 请求返回的所有字符串响应,并确保响应的数量与 post 发送到服务器的文档数量相匹配。跟踪任何错误。如果任何 Get 或 Post 请求失败,记录错误。

我想出了如何执行所有这些步骤 运行在每个子任务上使用 block() 'Get request' 然后在主任务上使用 block() 'Post request',但是我担心使用这种方法性能会受到影响。

我需要有关如何使用 Web 客户端和反应式非阻塞并行进程生成流的帮助。

感谢您的帮助。

'恐怕使用这种方法性能会受到影响。' - 你是对的。毕竟,使用 WebFlux 的全部目的是创建一个非阻塞应用程序。

我已经尝试模拟大部分逻辑。我希望您能将解决方案与您的用例相关联。

@RestController
public class MyController {

  @Autowired
  private WebClient webClient;

  @PostMapping(value = "/postPacketOfDocs")
  public Mono<ResponseEntity<String>> upload(@RequestBody Flux<String> documentAndMetaDataList) {

    return documentAndMetaDataList
        .flatMap(documentAndMetaData -> {
          //do File I/O
          return getDocumentAndMetadata(documentAndMetaData);
        })
        .map(String::getBytes) //Read File to get a Byte array
        .flatMap(fileBytes -> {
          return webClient.post().uri("/send/byte/and/metadata")
              .retrieve().bodyToMono(String.class);
        })
        .collectList()
        .flatMap(allResponsesFromEachPOST -> {
          //Do some validation 
          boolean allValidationsSuccessful = true;
          if (allValidationsSuccessful) {
            return Mono.just("Success");
          } else {
            return Mono.error(new RuntimeException()); //some custom exception which can be handled by @ExceptionHandler
          }
        })
        .flatMap(msg -> Mono.just(ResponseEntity.ok().body(msg)));

  }

  private Mono<String> getDocumentAndMetadata(String documentAndMetaData) {
    String metadata = "";//get metadata object from documentAndMetaData
    Mono<String> keyAResponse = webClient.get().uri("/get/keyA").retrieve().bodyToMono(String.class);
    Mono<String> keyBResponse = webClient.get().uri("/get/keyB").retrieve().bodyToMono(String.class);
    return keyAResponse.concatWith(keyBResponse)
        .collectList()
        .flatMap(responses -> updateMetadata(responses, metadata));
  }

  private Mono<String> updateMetadata(List<String> responses, String metadata) {
    String newMedataData = metadata + responses.get(0) + responses.get(1); //some update logic
    return Mono.just(newMedataData);
  }
}