如何通过过滤压缩多个 Flux 流

How to zip multiple Flux streams with filtering

我有 2 个源 Flux 流,其中 return 所有关键字和所有词典的流:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Keyword 引用了一个 Dictionary 对象,如下所示:

public class Keyword {
    private String id;
    private String dictionaryId;
}

目标是转换 Flux<DictionaryTO>,其中包含 Dictionary 的所有属性以及属于字典 的关键字列表

public class DictionaryTO {
    private String id;
    private Collection<KeywordTO> keywords;
}
public class KeywordTO {
    private String id;
}

问题是如何在不阻塞任何源流的情况下以反应方式zip/merge这两个 Flux 流。

注意 keywordFlux 包含 all 个关键词,因此应该根据 Keyword.dictionaryId.

应用一些过滤

根据 boris-the-spider 的建议,我最终使用了 .flatMap().zipWith()

  1. 创建一个 Mono<Map> 个关键字(按 dictionaryId 分组)并缓存它,因为它稍后会被多次使用。
  2. flatMap字典Fluxzip单字字典有上面的关键词图。然后将“字典和关键字映射的元组”映射到带有关键字的字典。

完整解决方案:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
    .collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
    .cache(); 

Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
    .map(dictionaryTOMapper:map) 
    .flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
    .map(tuple -> {
        Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
        DictionaryTO dic = tuple.getT1();
        dic.setKeywords(keywordsForDic);
        return dic;
    });