如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中

How to extract content from Mono<List<T>> in WebFlux to pass it down the call chain

我希望能够从 Mono<List<Payload>> 中提取 List<Payload> 以将其传递给下游服务进行处理(或者可能 return 来自 read(RequestParams params) 方法, 而不是 returning void):

@PostMapping("/subset")
    public void read(@RequestBody RequestParams params){
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
        ....
    }

其中 reader.read(...) 是自动装配 Spring 服务上的一种方法,它利用 webClient 从外部 Web 服务获取数据 API:

public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
        Flux<Payload> nodes = client
                .get()
                .uri(uriBuilder -> uriBuilder
                    .path("/api/subset")
                    .queryParam("payloads", true)
                    .queryParam("date", date)
                    .queryParam("assetClasses", assetClasses)
                    .queryParam("firmAccounts", firmAccounts)
                    .build())
                .headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> {
            System.out.println("4xx error");
            return Mono.error(new RuntimeException("4xx"));
        })
        .onStatus(HttpStatus::is5xxServerError, response -> {
            System.out.println("5xx error");
            return Mono.error(new RuntimeException("5xx"));
                })
        .bodyToFlux(Payload.class);
        
        Mono<List<Payload>> records = nodes
                .collectList();
        
        return records;
    }

在 WebFlux 中不允许执行阻塞 result.block() 并抛出异常:

new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;

在 WebFlux 中提取 Mono 内容的正确方法是什么? 它是某种 subscribe() 吗?语法是什么?

提前致谢。

没有“正确的方法”,这就是重点。要获得您需要阻止的价值,并且出于多种原因(我现在不会讨论),阻止在 webflux 中是不好的。

你应该做的是 return 发布者一直到调用客户端。

许多人通常难以理解的一件事是 webflux 与生产者(MonoFlux)和订阅者一起工作。

你的整个服务也是一个生产者,调用的客户端可以看作是订阅者。

将其视为一条长链,从数据源开始,到显示数据的客户端结束。

一个简单的经验法则是,谁是数据的最终消费者就是订阅者,其他人都是生产者。

因此在您的情况下,您只需 return 将 Mono<List<T> 发送给调用客户端。

@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params){
    Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
    return result;
}

虽然以下 return 日志中 Mono 可观察值的值:

@PostMapping("/subset")
@ResponseBody
public  Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
                
        return result
                 .map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
        
    }

我正在寻求的理解是使用 WebFlux 组成调用链的正确方法,其中一个 operators/legs 的响应(作为 webclient 调用的结果具体化,产生一组记录,如上所述)可以传递到下游到另一个 operator/leg 以促进将这些记录保存在数据库中的副作用,或类似的东西。

将这些步骤中的每一个建模为单独的 REST 端点可能是个好主意,然后为组合操作提供另一个端点,该组合操作在内部以正确的顺序调用每个独立的端点,或者其他设计选择是更喜欢?

这最终是我一直在寻找的理解,所以如果有人想分享示例代码以及意见以更好地实施上述步骤集,我愿意接受最全面的答案。

谢谢。