我有一组复杂的任务使用 Java Web 客户端请求,这些请求需要并行 运行 并最终阻塞到 return 单个响应
I have a complicated set of tasks using Java Web-Client requests that need to run in parallel and finally block to return a single response
我是 Web 客户端响应式库的新手。
这是我的问题:
它从用户向 post 一包文件提交请求开始。他们等待回应。
使用此请求的服务需要运行 并行处理多个任务。每个任务中的一些子任务必须先完成(2 个不同的 Get 请求),然后才能尝试最后一个子任务,即主要 Post 请求。然后我想等待所有任务的收集'Post sub-tasks'完成(代表数据包),然后收集并协调响应。
我需要在最后协调并确保整个并行过程成功(发送数据包),然后响应服务器(用户)指示该过程是否成功。
我的伪流程:
- 创建一组文档,一次一个 post 到服务器。一个数据包最多可包含 10 个文件。 (文档和元数据列表)。最初每个文档都会包含一些预填充的已知值,例如文件路径和文档名称。
- 对于数据包中的每个文档:(运行 并行)
- 我需要执行文件 I/O 并创建一个元数据对象 - 将其命名为 getDocumentAndMetadata。要创建元数据对象,我必须做一些
getDocumentAndMetadata 中的第一步:
- 执行获取请求以获取密钥 A - 将其称为 getKeyA(requestA)
- 执行获取请求以获取密钥 B - 将其称为 getKeyB(requestB)
- 合并密钥 A 和密钥 B 请求并使用来自这些请求的响应来更新元数据对象。
- 然后读取文件以获取字节数组 - 将其命名为 getFile
- 然后将字节数组(文档)和元数据对象传递给函数:
- 向服务器发送 Http Post,在 post 请求中发送字节数组和元数据对象。
- 累积每个 Post 的响应,它们是字符串。
- 然后阻塞,直到发送完所有文件。
最后评估从 Post 请求返回的所有字符串响应,并确保响应的数量与 post 发送到服务器的文档数量相匹配。跟踪任何错误。如果任何 Get 或 Post 请求失败,记录错误。
我想出了如何执行所有这些步骤 运行在每个子任务上使用 block() 'Get request' 然后在主任务上使用 block() 'Post request',但是我担心使用这种方法性能会受到影响。
我需要有关如何使用 Web 客户端和反应式非阻塞并行进程生成流的帮助。
感谢您的帮助。
'恐怕使用这种方法性能会受到影响。' - 你是对的。毕竟,使用 WebFlux 的全部目的是创建一个非阻塞应用程序。
我已经尝试模拟大部分逻辑。我希望您能将解决方案与您的用例相关联。
@RestController
public class MyController {
@Autowired
private WebClient webClient;
@PostMapping(value = "/postPacketOfDocs")
public Mono<ResponseEntity<String>> upload(@RequestBody Flux<String> documentAndMetaDataList) {
return documentAndMetaDataList
.flatMap(documentAndMetaData -> {
//do File I/O
return getDocumentAndMetadata(documentAndMetaData);
})
.map(String::getBytes) //Read File to get a Byte array
.flatMap(fileBytes -> {
return webClient.post().uri("/send/byte/and/metadata")
.retrieve().bodyToMono(String.class);
})
.collectList()
.flatMap(allResponsesFromEachPOST -> {
//Do some validation
boolean allValidationsSuccessful = true;
if (allValidationsSuccessful) {
return Mono.just("Success");
} else {
return Mono.error(new RuntimeException()); //some custom exception which can be handled by @ExceptionHandler
}
})
.flatMap(msg -> Mono.just(ResponseEntity.ok().body(msg)));
}
private Mono<String> getDocumentAndMetadata(String documentAndMetaData) {
String metadata = "";//get metadata object from documentAndMetaData
Mono<String> keyAResponse = webClient.get().uri("/get/keyA").retrieve().bodyToMono(String.class);
Mono<String> keyBResponse = webClient.get().uri("/get/keyB").retrieve().bodyToMono(String.class);
return keyAResponse.concatWith(keyBResponse)
.collectList()
.flatMap(responses -> updateMetadata(responses, metadata));
}
private Mono<String> updateMetadata(List<String> responses, String metadata) {
String newMedataData = metadata + responses.get(0) + responses.get(1); //some update logic
return Mono.just(newMedataData);
}
}
我是 Web 客户端响应式库的新手。
这是我的问题:
它从用户向 post 一包文件提交请求开始。他们等待回应。
使用此请求的服务需要运行 并行处理多个任务。每个任务中的一些子任务必须先完成(2 个不同的 Get 请求),然后才能尝试最后一个子任务,即主要 Post 请求。然后我想等待所有任务的收集'Post sub-tasks'完成(代表数据包),然后收集并协调响应。
我需要在最后协调并确保整个并行过程成功(发送数据包),然后响应服务器(用户)指示该过程是否成功。
我的伪流程:
- 创建一组文档,一次一个 post 到服务器。一个数据包最多可包含 10 个文件。 (文档和元数据列表)。最初每个文档都会包含一些预填充的已知值,例如文件路径和文档名称。
- 对于数据包中的每个文档:(运行 并行)
- 我需要执行文件 I/O 并创建一个元数据对象 - 将其命名为 getDocumentAndMetadata。要创建元数据对象,我必须做一些
getDocumentAndMetadata 中的第一步:
- 执行获取请求以获取密钥 A - 将其称为 getKeyA(requestA)
- 执行获取请求以获取密钥 B - 将其称为 getKeyB(requestB)
- 合并密钥 A 和密钥 B 请求并使用来自这些请求的响应来更新元数据对象。
- 然后读取文件以获取字节数组 - 将其命名为 getFile
- 然后将字节数组(文档)和元数据对象传递给函数:
- 向服务器发送 Http Post,在 post 请求中发送字节数组和元数据对象。
- 累积每个 Post 的响应,它们是字符串。
- 然后阻塞,直到发送完所有文件。
- 我需要执行文件 I/O 并创建一个元数据对象 - 将其命名为 getDocumentAndMetadata。要创建元数据对象,我必须做一些
getDocumentAndMetadata 中的第一步:
- 对于数据包中的每个文档:(运行 并行)
最后评估从 Post 请求返回的所有字符串响应,并确保响应的数量与 post 发送到服务器的文档数量相匹配。跟踪任何错误。如果任何 Get 或 Post 请求失败,记录错误。
我想出了如何执行所有这些步骤 运行在每个子任务上使用 block() 'Get request' 然后在主任务上使用 block() 'Post request',但是我担心使用这种方法性能会受到影响。
我需要有关如何使用 Web 客户端和反应式非阻塞并行进程生成流的帮助。
感谢您的帮助。
'恐怕使用这种方法性能会受到影响。' - 你是对的。毕竟,使用 WebFlux 的全部目的是创建一个非阻塞应用程序。
我已经尝试模拟大部分逻辑。我希望您能将解决方案与您的用例相关联。
@RestController
public class MyController {
@Autowired
private WebClient webClient;
@PostMapping(value = "/postPacketOfDocs")
public Mono<ResponseEntity<String>> upload(@RequestBody Flux<String> documentAndMetaDataList) {
return documentAndMetaDataList
.flatMap(documentAndMetaData -> {
//do File I/O
return getDocumentAndMetadata(documentAndMetaData);
})
.map(String::getBytes) //Read File to get a Byte array
.flatMap(fileBytes -> {
return webClient.post().uri("/send/byte/and/metadata")
.retrieve().bodyToMono(String.class);
})
.collectList()
.flatMap(allResponsesFromEachPOST -> {
//Do some validation
boolean allValidationsSuccessful = true;
if (allValidationsSuccessful) {
return Mono.just("Success");
} else {
return Mono.error(new RuntimeException()); //some custom exception which can be handled by @ExceptionHandler
}
})
.flatMap(msg -> Mono.just(ResponseEntity.ok().body(msg)));
}
private Mono<String> getDocumentAndMetadata(String documentAndMetaData) {
String metadata = "";//get metadata object from documentAndMetaData
Mono<String> keyAResponse = webClient.get().uri("/get/keyA").retrieve().bodyToMono(String.class);
Mono<String> keyBResponse = webClient.get().uri("/get/keyB").retrieve().bodyToMono(String.class);
return keyAResponse.concatWith(keyBResponse)
.collectList()
.flatMap(responses -> updateMetadata(responses, metadata));
}
private Mono<String> updateMetadata(List<String> responses, String metadata) {
String newMedataData = metadata + responses.get(0) + responses.get(1); //some update logic
return Mono.just(newMedataData);
}
}