如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中
How to extract content from Mono<List<T>> in WebFlux to pass it down the call chain
我希望能够从 Mono<List<Payload>>
中提取 List<Payload>
以将其传递给下游服务进行处理(或者可能 return 来自 read(RequestParams params)
方法, 而不是 returning void
):
@PostMapping("/subset")
public void read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
....
}
其中 reader.read(...)
是自动装配 Spring 服务上的一种方法,它利用 webClient 从外部 Web 服务获取数据 API:
public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
Flux<Payload> nodes = client
.get()
.uri(uriBuilder -> uriBuilder
.path("/api/subset")
.queryParam("payloads", true)
.queryParam("date", date)
.queryParam("assetClasses", assetClasses)
.queryParam("firmAccounts", firmAccounts)
.build())
.headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {
System.out.println("4xx error");
return Mono.error(new RuntimeException("4xx"));
})
.onStatus(HttpStatus::is5xxServerError, response -> {
System.out.println("5xx error");
return Mono.error(new RuntimeException("5xx"));
})
.bodyToFlux(Payload.class);
Mono<List<Payload>> records = nodes
.collectList();
return records;
}
在 WebFlux 中不允许执行阻塞 result.block()
并抛出异常:
new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;
在 WebFlux 中提取 Mono 内容的正确方法是什么?
它是某种 subscribe()
吗?语法是什么?
提前致谢。
没有“正确的方法”,这就是重点。要获得您需要阻止的价值,并且出于多种原因(我现在不会讨论),阻止在 webflux 中是不好的。
你应该做的是 return 发布者一直到调用客户端。
许多人通常难以理解的一件事是 webflux 与生产者(Mono
或 Flux
)和订阅者一起工作。
你的整个服务也是一个生产者,调用的客户端可以看作是订阅者。
将其视为一条长链,从数据源开始,到显示数据的客户端结束。
一个简单的经验法则是,谁是数据的最终消费者就是订阅者,其他人都是生产者。
因此在您的情况下,您只需 return 将 Mono<List<T>
发送给调用客户端。
@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result;
}
虽然以下 return 日志中 Mono 可观察值的值:
@PostMapping("/subset")
@ResponseBody
public Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result
.map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
}
我正在寻求的理解是使用 WebFlux 组成调用链的正确方法,其中一个 operators/legs 的响应(作为 webclient 调用的结果具体化,产生一组记录,如上所述)可以传递到下游到另一个 operator/leg 以促进将这些记录保存在数据库中的副作用,或类似的东西。
将这些步骤中的每一个建模为单独的 REST 端点可能是个好主意,然后为组合操作提供另一个端点,该组合操作在内部以正确的顺序调用每个独立的端点,或者其他设计选择是更喜欢?
这最终是我一直在寻找的理解,所以如果有人想分享示例代码以及意见以更好地实施上述步骤集,我愿意接受最全面的答案。
谢谢。
我希望能够从 Mono<List<Payload>>
中提取 List<Payload>
以将其传递给下游服务进行处理(或者可能 return 来自 read(RequestParams params)
方法, 而不是 returning void
):
@PostMapping("/subset")
public void read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
....
}
其中 reader.read(...)
是自动装配 Spring 服务上的一种方法,它利用 webClient 从外部 Web 服务获取数据 API:
public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) {
Flux<Payload> nodes = client
.get()
.uri(uriBuilder -> uriBuilder
.path("/api/subset")
.queryParam("payloads", true)
.queryParam("date", date)
.queryParam("assetClasses", assetClasses)
.queryParam("firmAccounts", firmAccounts)
.build())
.headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> {
System.out.println("4xx error");
return Mono.error(new RuntimeException("4xx"));
})
.onStatus(HttpStatus::is5xxServerError, response -> {
System.out.println("5xx error");
return Mono.error(new RuntimeException("5xx"));
})
.bodyToFlux(Payload.class);
Mono<List<Payload>> records = nodes
.collectList();
return records;
}
在 WebFlux 中不允许执行阻塞 result.block()
并抛出异常:
new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;
在 WebFlux 中提取 Mono 内容的正确方法是什么?
它是某种 subscribe()
吗?语法是什么?
提前致谢。
没有“正确的方法”,这就是重点。要获得您需要阻止的价值,并且出于多种原因(我现在不会讨论),阻止在 webflux 中是不好的。
你应该做的是 return 发布者一直到调用客户端。
许多人通常难以理解的一件事是 webflux 与生产者(Mono
或 Flux
)和订阅者一起工作。
你的整个服务也是一个生产者,调用的客户端可以看作是订阅者。
将其视为一条长链,从数据源开始,到显示数据的客户端结束。
一个简单的经验法则是,谁是数据的最终消费者就是订阅者,其他人都是生产者。
因此在您的情况下,您只需 return 将 Mono<List<T>
发送给调用客户端。
@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result;
}
虽然以下 return 日志中 Mono 可观察值的值:
@PostMapping("/subset")
@ResponseBody
public Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params){
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result
.map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
}
我正在寻求的理解是使用 WebFlux 组成调用链的正确方法,其中一个 operators/legs 的响应(作为 webclient 调用的结果具体化,产生一组记录,如上所述)可以传递到下游到另一个 operator/leg 以促进将这些记录保存在数据库中的副作用,或类似的东西。
将这些步骤中的每一个建模为单独的 REST 端点可能是个好主意,然后为组合操作提供另一个端点,该组合操作在内部以正确的顺序调用每个独立的端点,或者其他设计选择是更喜欢?
这最终是我一直在寻找的理解,所以如果有人想分享示例代码以及意见以更好地实施上述步骤集,我愿意接受最全面的答案。
谢谢。