Mono 未按计划执行

Mono not executing with schedule

我有一个使用以下方法的 Spring webflux 应用程序。

@Override
public Mono<Integer> updateSetting(int orgId, IntegrationDto dto,
        Map<String, Object> jsonMap) {
    return retrieveServices(dto.getClientId()).flatMap(services -> {
        jsonMap.put("service", services);
        return categoryRepository.findCategoryIdCountByName("test", orgId)
                .flatMap(categoryIdCount -> {
                    final ServiceDto serviceInput = new ServiceDto();
                    if (categoryIdCount == 0) {
                        return inventoryCategoryRepository.save(InventoryCategory.of("test", orgId))
                                .flatMap(category -> {
                                    return saveServices(serviceInput, orgId, jsonMap,
                                            category.getCategoryId());
                                });
                    } else {
                        // Some Logc here ...
                    }
                });
    }).onErrorResume(e -> {
        if (e instanceof WebClientResponseException) {
            int statusCode = ((WebClientResponseException) e).getRawStatusCode();
            throw new LabServiceException("Unable to connect to the service !", statusCode);
        }
        throw new ServiceException("Error connecting to the service !");
    });
}

private Mono<Services> retrieveServices(final String clientId) {
    return webClient.get().uri(props.getBaseUrl() + "/api/v1/services")
            .retrieve().bodyToMono(Services.class);
}

private Mono<Integer> saveInventories(ServiceInput serviceInput, int orgId, Map<String, Object> jsonMap,
        Long categoryId) {
    return refreshInventories(serviceInput, orgId, categoryId).flatMap(reponse -> {
        return updateSetting(branchId, jsonMap);
    });
}


private Mono<Integer> refreshInventories(ServiceInput serviceInput, int orgId, Long categoryId) {
    return inventoryRepository.findAllCodesByTypeBranchId(branchId).collectList().flatMap(codes -> {
        return retrieveAvailableServices(Optional.of(serviceInput), categoryId).flatMap(services -> {
            List<Inventory> inventories = services.stream()
                    .filter(inventory -> !codes.contains(inventory.getCode()))
                    .map(inventoryDto -> toInventory(inventoryDto, branchId)).collect(Collectors.toList());
            if (inventories.size() > 0) {
                return saveAllInventories(inventories).flatMap(response -> {
                    return Mono.just(orgId);
                });
            } else {
                return Mono.just(orgId);
            }
        });
    });
}

此处,正在从 REST 调用中调用 updateSettig public 方法,所有方法都按预期执行。

现在,我想像调度程序一样用不同的流程执行相同的操作。

当我也从调度程序调用时,它起作用了。

updateSetting(orgId, dto, jsonMap).subscribe();

但是,我想等到 updateSetting 被执行。

所以,尝试使用下面的代码。

updateSetting(orgId, dto, jsonMap).flatMap(response -> {
  ////
});

使用上面的代码,调用了 updateSetting 方法,但没有进入 retrieveServices。

return retrieveServices(dto.getClientId()).flatMap(services -> {

最后总是要订阅的。所以你的代码应该是:

updateSetting(orgId, dto, jsonMap).flatMap(response -> {
  ////
}).subscribe();