Mono 未按计划执行
Mono not executing with schedule
我有一个使用以下方法的 Spring webflux 应用程序。
@Override
public Mono<Integer> updateSetting(int orgId, IntegrationDto dto,
Map<String, Object> jsonMap) {
return retrieveServices(dto.getClientId()).flatMap(services -> {
jsonMap.put("service", services);
return categoryRepository.findCategoryIdCountByName("test", orgId)
.flatMap(categoryIdCount -> {
final ServiceDto serviceInput = new ServiceDto();
if (categoryIdCount == 0) {
return inventoryCategoryRepository.save(InventoryCategory.of("test", orgId))
.flatMap(category -> {
return saveServices(serviceInput, orgId, jsonMap,
category.getCategoryId());
});
} else {
// Some Logc here ...
}
});
}).onErrorResume(e -> {
if (e instanceof WebClientResponseException) {
int statusCode = ((WebClientResponseException) e).getRawStatusCode();
throw new LabServiceException("Unable to connect to the service !", statusCode);
}
throw new ServiceException("Error connecting to the service !");
});
}
private Mono<Services> retrieveServices(final String clientId) {
return webClient.get().uri(props.getBaseUrl() + "/api/v1/services")
.retrieve().bodyToMono(Services.class);
}
private Mono<Integer> saveInventories(ServiceInput serviceInput, int orgId, Map<String, Object> jsonMap,
Long categoryId) {
return refreshInventories(serviceInput, orgId, categoryId).flatMap(reponse -> {
return updateSetting(branchId, jsonMap);
});
}
private Mono<Integer> refreshInventories(ServiceInput serviceInput, int orgId, Long categoryId) {
return inventoryRepository.findAllCodesByTypeBranchId(branchId).collectList().flatMap(codes -> {
return retrieveAvailableServices(Optional.of(serviceInput), categoryId).flatMap(services -> {
List<Inventory> inventories = services.stream()
.filter(inventory -> !codes.contains(inventory.getCode()))
.map(inventoryDto -> toInventory(inventoryDto, branchId)).collect(Collectors.toList());
if (inventories.size() > 0) {
return saveAllInventories(inventories).flatMap(response -> {
return Mono.just(orgId);
});
} else {
return Mono.just(orgId);
}
});
});
}
此处,正在从 REST 调用中调用 updateSettig public 方法,所有方法都按预期执行。
现在,我想像调度程序一样用不同的流程执行相同的操作。
当我也从调度程序调用时,它起作用了。
updateSetting(orgId, dto, jsonMap).subscribe();
但是,我想等到 updateSetting 被执行。
所以,尝试使用下面的代码。
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
});
使用上面的代码,调用了 updateSetting 方法,但没有进入 retrieveServices。
return retrieveServices(dto.getClientId()).flatMap(services -> {
最后总是要订阅的。所以你的代码应该是:
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
}).subscribe();
我有一个使用以下方法的 Spring webflux 应用程序。
@Override
public Mono<Integer> updateSetting(int orgId, IntegrationDto dto,
Map<String, Object> jsonMap) {
return retrieveServices(dto.getClientId()).flatMap(services -> {
jsonMap.put("service", services);
return categoryRepository.findCategoryIdCountByName("test", orgId)
.flatMap(categoryIdCount -> {
final ServiceDto serviceInput = new ServiceDto();
if (categoryIdCount == 0) {
return inventoryCategoryRepository.save(InventoryCategory.of("test", orgId))
.flatMap(category -> {
return saveServices(serviceInput, orgId, jsonMap,
category.getCategoryId());
});
} else {
// Some Logc here ...
}
});
}).onErrorResume(e -> {
if (e instanceof WebClientResponseException) {
int statusCode = ((WebClientResponseException) e).getRawStatusCode();
throw new LabServiceException("Unable to connect to the service !", statusCode);
}
throw new ServiceException("Error connecting to the service !");
});
}
private Mono<Services> retrieveServices(final String clientId) {
return webClient.get().uri(props.getBaseUrl() + "/api/v1/services")
.retrieve().bodyToMono(Services.class);
}
private Mono<Integer> saveInventories(ServiceInput serviceInput, int orgId, Map<String, Object> jsonMap,
Long categoryId) {
return refreshInventories(serviceInput, orgId, categoryId).flatMap(reponse -> {
return updateSetting(branchId, jsonMap);
});
}
private Mono<Integer> refreshInventories(ServiceInput serviceInput, int orgId, Long categoryId) {
return inventoryRepository.findAllCodesByTypeBranchId(branchId).collectList().flatMap(codes -> {
return retrieveAvailableServices(Optional.of(serviceInput), categoryId).flatMap(services -> {
List<Inventory> inventories = services.stream()
.filter(inventory -> !codes.contains(inventory.getCode()))
.map(inventoryDto -> toInventory(inventoryDto, branchId)).collect(Collectors.toList());
if (inventories.size() > 0) {
return saveAllInventories(inventories).flatMap(response -> {
return Mono.just(orgId);
});
} else {
return Mono.just(orgId);
}
});
});
}
此处,正在从 REST 调用中调用 updateSettig public 方法,所有方法都按预期执行。
现在,我想像调度程序一样用不同的流程执行相同的操作。
当我也从调度程序调用时,它起作用了。
updateSetting(orgId, dto, jsonMap).subscribe();
但是,我想等到 updateSetting 被执行。
所以,尝试使用下面的代码。
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
});
使用上面的代码,调用了 updateSetting 方法,但没有进入 retrieveServices。
return retrieveServices(dto.getClientId()).flatMap(services -> {
最后总是要订阅的。所以你的代码应该是:
updateSetting(orgId, dto, jsonMap).flatMap(response -> {
////
}).subscribe();