Spring 使用 Webflux 进行响应式编程 - 作为非阻塞流的多个操作

Spring Reactive Programming with Webflux - multiple operations as a non-blocking stream

我有以下代码:

public Flux<Offer> getAllFilteredOffers(Map<String, String> searchParams) {

    Flux<ProductProperties> productProperties = productPropertiesService.findProductPropertiesBySearchCriteria(searchParams);
    Flux<Product> products = productService.findProductsByPropertyId(productProperties);
    Flux<Product> productsByAvailability = productService.getAllProductsByAvailability(products, searchParams);
    Flux<Offer> offers = offerRepository.findByPropertiesIds(productsByAvailability);
    return offers;

此方法:

productService.getAllProductsByAvailability(products, searchParams);

看起来像:

public Flux<Product> getAllProductsByAvailability(Flux<Product> products,
            Map<String, String> searchParams) {

如何将List<Product>传递给getAllProductsByAvailability以保持非阻塞操作? 我读过地图正在阻塞,应该避免。 也许是这样的?

    Flux
                    .just(productPropertiesService.findProductPropertiesBySearchCriteria(searchParams))
                    .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                    .flatMap(products -> productService.getAllProductsByAvailability(Flux.create(products)?????????, searchParams))
???

我不是 Webflux 方面的专家,目前我正在尝试弄清楚如何处理以下问题:我有 Flux 但在第二步中我需要从之前的 Flex<> 对象中提取一些数据 - 保持非阻塞流。

比不过你!

我不知道你在哪里读到的map,但是如果你看一下官方文档Webflux map operator 没有关于阻塞的任何内容,它只是对每个项目使用同步功能。

使用此代码:

productPropertiesService.findProductPropertiesBySearchCriteria(searchParams)
                .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                .collectList()   (1)
                .flatMapMany(products -> productService.getAllProductsByAvailability(Flux.fromIterable(products), searchParams)) (2)

1) 将所有元素收集到 List 并转换为 Mono>

2) 从 List 创建 FLux 并将其作为参数提供,flatMapMany 将 Mono 转换为 Flux