如何使用 SpringBoot Webflux + R2dbc 应用程序在一次调用中实现数据库更新查询列表
How to implement a list of DB update queries in one call with SpringBoot Webflux + R2dbc application
我的 springBoot webflux r2dbc 应用程序的目标是控制器接受一个包含 list DB UPDATE 或 INSERT 详细信息的请求,并响应返回结果摘要。
我可以编写一个基于 ReactiveCrudRepository 的存储库来实现每个数据库操作。但是我不知道如何编写服务来对数据库操作列表的执行进行分组并组成结果摘要响应。
我是 java 响应式编程的新手。感谢您的任何建议和帮助。
陈
我从这里得到提示:https://www.vinsguru.com/spring-webflux-aggregation/。想法是:
- 根据请求创建 3 个 Monos
- Mono
monoEndDateSet -- 更新操作的数据库行 ID;
- Mono
monoCreateList -- 新插入的数据库行 ID;
- Mono monoRespFilled -- 部分填充一些已知字段;
- 使用Mono.zip聚合3个monos,将Tuple3映射并聚合到Mono到return。
以下是代码的关键部分:
public Mono<ChangeSupplyResponse> ChangeSupplies(ChangeSupplyRequest csr){
ChangeSupplyResponse resp = ChangeSupplyResponse.builder().build();
resp.setEventType(csr.getEventType());
resp.setSupplyOperationId(csr.getSupplyOperationId());
resp.setTeamMemberId(csr.getTeamMemberId());
resp.setRequestTimeStamp(csr.getTimestamp());
resp.setProcessStart(OffsetDateTime.now());
resp.setUserId(csr.getUserId());
Mono<List<Long>> monoEndDateSet = getEndDateIdList(csr);
Mono<List<Long>> monoCreateList = getNewSupplyEntityList(csr);
Mono<ChangeSupplyResponse> monoRespFilled = Mono.just(resp);
return Mono.zip(monoRespFilled, monoEndDateSet, monoCreateList).map(this::combine).as(operator::transactional);
}
private ChangeSupplyResponse combine(Tuple3<ChangeSupplyResponse, List<Long>, List<Long>> tuple){
ChangeSupplyResponse resp = tuple.getT1().toBuilder().build();
List<Long> endDateIds = tuple.getT2();
resp.setEndDatedDemandStreamSupplyIds(endDateIds);
List<Long> newIds = tuple.getT3();
resp.setNewCreatedDemandStreamSupplyIds(newIds);
resp.setSuccess(true);
Duration span = Duration.between(resp.getProcessStart(), OffsetDateTime.now());
resp.setProcessDurationMillis(span.toMillis());
return resp;
}
private Mono<List<Long>> getNewSupplyEntityList(ChangeSupplyRequest csr) {
Flux<DemandStreamSupplyEntity> fluxNewCreated = Flux.empty();
for (SrmOperation so : csr.getOperations()) {
if (so.getType() == SrmOperationType.createSupply) {
DemandStreamSupplyEntity e = buildEntity(so, csr);
fluxNewCreated = fluxNewCreated.mergeWith(this.demandStreamSupplyRepository.save(e));
}
}
return fluxNewCreated.map(e -> e.getDemandStreamSupplyId()).collectList();
}
...
我的 springBoot webflux r2dbc 应用程序的目标是控制器接受一个包含 list DB UPDATE 或 INSERT 详细信息的请求,并响应返回结果摘要。
我可以编写一个基于 ReactiveCrudRepository 的存储库来实现每个数据库操作。但是我不知道如何编写服务来对数据库操作列表的执行进行分组并组成结果摘要响应。
我是 java 响应式编程的新手。感谢您的任何建议和帮助。
陈
我从这里得到提示:https://www.vinsguru.com/spring-webflux-aggregation/。想法是:
- 根据请求创建 3 个 Monos
- Mono
- monoEndDateSet -- 更新操作的数据库行 ID;
- Mono
- monoCreateList -- 新插入的数据库行 ID;
- Mono monoRespFilled -- 部分填充一些已知字段;
- 使用Mono.zip聚合3个monos,将Tuple3映射并聚合到Mono到return。
以下是代码的关键部分:
public Mono<ChangeSupplyResponse> ChangeSupplies(ChangeSupplyRequest csr){
ChangeSupplyResponse resp = ChangeSupplyResponse.builder().build();
resp.setEventType(csr.getEventType());
resp.setSupplyOperationId(csr.getSupplyOperationId());
resp.setTeamMemberId(csr.getTeamMemberId());
resp.setRequestTimeStamp(csr.getTimestamp());
resp.setProcessStart(OffsetDateTime.now());
resp.setUserId(csr.getUserId());
Mono<List<Long>> monoEndDateSet = getEndDateIdList(csr);
Mono<List<Long>> monoCreateList = getNewSupplyEntityList(csr);
Mono<ChangeSupplyResponse> monoRespFilled = Mono.just(resp);
return Mono.zip(monoRespFilled, monoEndDateSet, monoCreateList).map(this::combine).as(operator::transactional);
}
private ChangeSupplyResponse combine(Tuple3<ChangeSupplyResponse, List<Long>, List<Long>> tuple){
ChangeSupplyResponse resp = tuple.getT1().toBuilder().build();
List<Long> endDateIds = tuple.getT2();
resp.setEndDatedDemandStreamSupplyIds(endDateIds);
List<Long> newIds = tuple.getT3();
resp.setNewCreatedDemandStreamSupplyIds(newIds);
resp.setSuccess(true);
Duration span = Duration.between(resp.getProcessStart(), OffsetDateTime.now());
resp.setProcessDurationMillis(span.toMillis());
return resp;
}
private Mono<List<Long>> getNewSupplyEntityList(ChangeSupplyRequest csr) {
Flux<DemandStreamSupplyEntity> fluxNewCreated = Flux.empty();
for (SrmOperation so : csr.getOperations()) {
if (so.getType() == SrmOperationType.createSupply) {
DemandStreamSupplyEntity e = buildEntity(so, csr);
fluxNewCreated = fluxNewCreated.mergeWith(this.demandStreamSupplyRepository.save(e));
}
}
return fluxNewCreated.map(e -> e.getDemandStreamSupplyId()).collectList();
}
...