仅保存来自 Flux<> 的唯一元素

Save only unique elements from Flux<>

我对反应式编程还很陌生,我很难理解如何实现这样简单的逻辑:

所以,在我的代码中,我有一个带地址的 Flux<>,我需要将缺失的地址添加到数据库

Flux<StreetAddressDto> parseDictionaries = Flux.defer(() ->
    csvParser.parseFile(fileName)
    .doOnNext(streetAddressDto -> {
      log.info("PROCESSING {}", streetAddressDto);
      dictionaryService.updateDictionaries(streetAddressDto);
    })

dictionaryService.updateDictionaries() 中,我有在 Postgres table 中查找地址的逻辑。如果值还没有出现,那么我保存这个新值。

Mono<TownAddressEntity> townAddressEntityMono = townAddressRepository.findByTownName(townAddress.getTownName())
                .switchIfEmpty(townAddressRepository.save(townAddress));

当我的 Flux 中有两个相同的地址元素时出现问题。 为两个元素调用方法 townAddressRepository.findByTownName(),返回空结果,然后也为两个地址调用 townAddressRepository.save(townAddress)。所以执行后我将在数据库中有两个相同的地址。但是我只需要保存一个唯一的地址。
谁能帮助我了解如何解决此问题?

你可以做类似的事情。 如果在 fb 中找到地址,则执行 Mono.emtpy 这将不会继续流,如果未找到,则发出元素,以便流继续,然后保存在 db

  public void run(String... args) throws Exception {
Flux.just("address1","address2","address3")
    .flatMap(s -> getIfNotInDb(s))
    .flatMap(s -> saveInDb(s))
    
}


  Mono<String> getIfNotInDb(String address){
return   findInDb(address)
      .hasElement()
      .flatMap(aBoolean -> aBoolean?Mono.empty():Mono.just(address));
     
  }