如何在列表上并行进行潜在的数据库交互操作
How to make operation with potential DB interactions in parallel on list
我试图让我的代码更有效地工作,所以我试图了解如何让它与 Futures 和 ForkJoinPool 一起工作。
现在我有这样工作的代码:
@RestController
@RequestMapping(SEND)
@Slf4j
public class InputMessageController {
private HandlerService service;
@ApiResponses(value = {
@ApiResponse(code = 200, message = "message sent"),
@ApiResponse(code = 404, message = "channel not found or inactive"),
})
@RequestMapping(value = "/{channel}", method = RequestMethod.POST)
@ResponseStatus(HttpStatus.OK)
public List<ResponseDto> sendNotification(
@Valid @RequestBody @NotNull List<PayloadInfoDto> requestDtoList,
@PathVariable("channel") String url) throws InputChannelInactiveException {
InputChannel channel = mapService.getChannelByUrl(url);
if (channel != null){
return service.processing(requestDtoList, channel);
} else {
loggerService.logChannelNotFound()
throw new InputChannelInactiveException(url);
}
}
}
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {
List<ResponseDto> responseDtoList = new ArrayList<>();
//this one takes quite long. would be better if it was done in multiple threads
requestDtoList.forEach(inputRequestDto -> responseDtoList.add(processNorm(inputRequestDto, channel)));
return responseDtoList;
}
}
processNorm(PayloadInfoDto inputRequestDto, InputChannel channel) {
RequestMessage msg;
//call to multiple services which can throw an exception. Each exception is processed.
//call to logger service which logs info to database
}
我的问题是:
- 对于与数据库一起运行的记录器服务,写入而不是定期保存到数据库会更好吗,特别是如果我们不想等待数据库写入并想继续?
private ExecutorService executor = Executors.newWorkStealingPool();
///....some code
public void exampleAsyncLog(){
//I don't wan't to wait while logger service writes to DB. It can take as long as it wants to write,
//while I'll move on
executor.submit(() -> saveSomethingToDb()); //saveSomethingToDb() is trivial logRepository.save(newEntity)
}
- 我还没想好如何 运行 CompletableFuture/ForkJoinPool 加入列表。至少在某种程度上 a) 将列表作为参数 b) 并行处理列表中的每个单元 c) 最终结果也是列表。
我遇到的所有示例都是通过制作 CompletableFuture 列表然后执行 allOf() like in this blogpost 来操作的。使用期货从列表中生成结果列表的正确方法是什么?或者有更好的方法吗?
我会说你使用 CompletableFuture 来完成这个任务,因为你只想在你的所有任务都保存在对你的任务最安全的数据库中时才继续。所以代码如下:
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {
List<ResponseDto> responseDtoList = new ArrayList<>();
//this one takes quite long. would be better if it was done in multiple threads
CompletableFuture<List<ResponseDto>> future
= CompletableFuture.supplyAsync(() -> requestDtoList.stream().map(inputRequestDto -> processNorm(inputRequestDto, channel)).collect(Collectors.toList())); //This code is not tested. It's the logic that I am showing you to implement that will work for you.
return future.get();//wait till all the operation is completed.
}
}
我试图让我的代码更有效地工作,所以我试图了解如何让它与 Futures 和 ForkJoinPool 一起工作。
现在我有这样工作的代码:
@RestController
@RequestMapping(SEND)
@Slf4j
public class InputMessageController {
private HandlerService service;
@ApiResponses(value = {
@ApiResponse(code = 200, message = "message sent"),
@ApiResponse(code = 404, message = "channel not found or inactive"),
})
@RequestMapping(value = "/{channel}", method = RequestMethod.POST)
@ResponseStatus(HttpStatus.OK)
public List<ResponseDto> sendNotification(
@Valid @RequestBody @NotNull List<PayloadInfoDto> requestDtoList,
@PathVariable("channel") String url) throws InputChannelInactiveException {
InputChannel channel = mapService.getChannelByUrl(url);
if (channel != null){
return service.processing(requestDtoList, channel);
} else {
loggerService.logChannelNotFound()
throw new InputChannelInactiveException(url);
}
}
}
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {
List<ResponseDto> responseDtoList = new ArrayList<>();
//this one takes quite long. would be better if it was done in multiple threads
requestDtoList.forEach(inputRequestDto -> responseDtoList.add(processNorm(inputRequestDto, channel)));
return responseDtoList;
}
}
processNorm(PayloadInfoDto inputRequestDto, InputChannel channel) {
RequestMessage msg;
//call to multiple services which can throw an exception. Each exception is processed.
//call to logger service which logs info to database
}
我的问题是:
- 对于与数据库一起运行的记录器服务,写入而不是定期保存到数据库会更好吗,特别是如果我们不想等待数据库写入并想继续?
private ExecutorService executor = Executors.newWorkStealingPool();
///....some code
public void exampleAsyncLog(){
//I don't wan't to wait while logger service writes to DB. It can take as long as it wants to write,
//while I'll move on
executor.submit(() -> saveSomethingToDb()); //saveSomethingToDb() is trivial logRepository.save(newEntity)
}
- 我还没想好如何 运行 CompletableFuture/ForkJoinPool 加入列表。至少在某种程度上 a) 将列表作为参数 b) 并行处理列表中的每个单元 c) 最终结果也是列表。 我遇到的所有示例都是通过制作 CompletableFuture 列表然后执行 allOf() like in this blogpost 来操作的。使用期货从列表中生成结果列表的正确方法是什么?或者有更好的方法吗?
我会说你使用 CompletableFuture 来完成这个任务,因为你只想在你的所有任务都保存在对你的任务最安全的数据库中时才继续。所以代码如下:
public class HandlerServiceImpl implements HandlerService {
public List<ResponseDto> processing(List<PayloadInfoDto> requestDtoList, InputChannel channel) {
List<ResponseDto> responseDtoList = new ArrayList<>();
//this one takes quite long. would be better if it was done in multiple threads
CompletableFuture<List<ResponseDto>> future
= CompletableFuture.supplyAsync(() -> requestDtoList.stream().map(inputRequestDto -> processNorm(inputRequestDto, channel)).collect(Collectors.toList())); //This code is not tested. It's the logic that I am showing you to implement that will work for you.
return future.get();//wait till all the operation is completed.
}
}