Webflux - return 条件后的通量或错误

Webflux - return Flux or error after a condition

我正在学习使用 webflux 进行反应式编程,为此我正在迁移一些代码。

例如,我正在尝试迁移此方法:

public Set<Vaccine> getAll(Set<Long> vaccinesIds) throws EntityNotFoundException {
        if (null == vaccinesIds) {
            return null;
        }

        Set<Long> vaccinesToFind = new HashSet<>(vaccinesIds);
        vaccinesToFind.remove(null);

        Set<Vaccine> vaccines = new HashSet<>();
        vaccineRepository.findByIdIn(vaccinesToFind).forEach(vaccines::add);

        if (vaccines.size() != vaccinesToFind.size()) {
            LOG.warn("Could not find vaccines with ids: " + vaccinesToFind.removeAll(vaccines.stream().map(Vaccine::getId).collect(Collectors.toSet())));
            throw new EntityNotFoundException(VACCINE_ERROR_NOT_FOUND);
        }

        return vaccines;
    }

总结一下代码,如果存储库 return 请求的所有疫苗应该 return 结果,如果不是应该 return 一个错误。

为此,我想过这样的事情,但没有奏效:

public Flux<Vaccine> getAll(Set<Long> vaccinesIds) {
    if (null == vaccinesIds) {
        return Flux.empty();
    }

    Set<Long> vaccinesToFind = new HashSet<>(vaccinesIds);

    Flux<Vaccine> byIdIn = vaccineRepository.findByIdIn(vaccinesToFind);
        
    Mono<Long> filter = vaccineRepository.findByIdIn(vaccinesToFind).count().filter(x -> x.equals(Long.valueOf(vaccinesToFind.size())));

   return filter.flatMapMany(asd -> vaccineRepository.findByIdIn(vaccinesToFind)
    ).switchIfEmpty(Flux.error((new EntityNotFoundException(VACCINE_ERROR_NOT_FOUND))));
   
}

我做错了什么?

我的第一个疑问是,如果过滤器最后有一个equals方法,为什么它是一个Mono of Long。我的问题是关于评估过滤器以 return 列表或错误。

首先,您多次查询相同的结果vaccineRepository.findByIdIn(vaccinesToFind)。多次查询、传输和反序列化相同的数据。这表明这里有问题。

让我们假设结果集适合内存。那么这个想法就是将 flux 转换为一个普通的集合并决定是否发出错误:

return vaccineRepository.findByIdIn(vaccinesIds)
.collectList()
.flatMapMany(result -> {
    if(result.size() == vaccinesIds.size()) return Flux.fromIterable(result);
    else return Flux.error(new EntityNotFoundException(VACCINE_ERROR_NOT_FOUND));
});

如果结果对于主内存来说太大了,你可以通过第一个查询在数据库中进行计数,在肯定的情况下查询结果。解决方法和你的代码类似:

return vaccineRepository.countByIdIn(vaccinesIds)
.filter(count -> count == vaccinesIds.size())
.flatMapMany($ -> vaccineRepository.findByIdIn(vaccinesIds))
.switchIfEmpty(Mono.error(new EntityNotFoundException(VACCINE_ERROR_NOT_FOUND)));

filter 的结果是 Mono<Long> 因为过滤器只是从上游获取元素并针对给定的谓词进行测试。如果谓词 returns 为假,则该项被过滤掉并且 Mono 为空。要保留测试的所有结果,您可以使用 map 并且类型为 Mono<Boolean>.