在 flatmap 操作之后访问 map 操作中的 Mono 对象
Access Mono object inside map operation after flatmap operation
我正在尝试使用自定义过滤器通过 Spring 云网关创建网关代理路由器。当以阻塞和命令的方式覆盖属性时,一切都按预期工作。
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + newTargetURLHost + )
字符串变量 newTargetURLHost 是通过使用获得的:
newTarget = serviceReturnsMono.getServerMapping(id).block().getHost();
我对 Webflux 还很陌生,但上面这行代码对我来说已经是一种代码味道了。进一步阅读后,这不是使用反应式的最佳方法。我尝试以更 functional/reactive 的方式重写,但未能让反应流发出所需的值。
Mono.just(serviceReturnsMono.getServerMapping(id))
.flatMap(flat -> flat)
.subscribeOn(Schedulers.immediate())
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() ))
.subscribe();
执行上述代码时,exchange 属性不会发生变化。
我也尝试了以下方法但没有成功:
serverMappingMono
.map(serverMapping -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + serverMapping.getHost() ))
.subscribe();
作为测试,当我如下修改代码进行故障排除时,下面确实发出了一个硬编码字符串并且交换属性发生了变化。
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + "testHostName" ))
.subscribe();
任何想法或指示将不胜感激。
更新:过滤代码如下:
private ReturnsMonoServerMappingService returnsMonoServerMappingService;
@Override
public int getOrder() {
return 10001;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest().getHeaders().getFirst("reference");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
Mono<ServerMapping> serverMapping = returnsMonoServerMappingService.getServerMapping(id);
serverMapping
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() )))
.subscribe();
}));
}
}
来自 Thomas 的更新解决方案:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest()
.getHeaders()
.getFirst("reference");
return returnsMonoServerMappingService.getServerMapping(id)
.doOnSuccess(serverMapping -> {
exchange.getAttributes()
.put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
}).then(chain.filter(exchange));
}
我忽略的缺失部分是 "doOnSuccess",因为 returnsMonoServerMappingService 已经 returns 一个单声道。然后通过 "then" 链接交换以委托给链中的下一个过滤器。
我正在我的 phone 上写这篇文章,所以无法测试它并且我是凭记忆写的,但我认为它应该是这样的。或者至少你明白了要点。
我们先提取id。然后我们查找服务器映射,如果一切顺利,我们将其作为一个属性,然后我们继续过滤器链。
您几乎不应该在应用程序中订阅,调用客户端通常是订阅者。永远不要阻塞反应性应用程序。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest()
.getHeaders()
.getFirst("reference");
return returnsMonoServerMappingService.getServerMapping(id)
.doOnSuccess(serverMapping -> {
exchange.getAttributes()
.put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
}).then(chain.filter(exchange));
}
我正在尝试使用自定义过滤器通过 Spring 云网关创建网关代理路由器。当以阻塞和命令的方式覆盖属性时,一切都按预期工作。
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + newTargetURLHost + )
字符串变量 newTargetURLHost 是通过使用获得的:
newTarget = serviceReturnsMono.getServerMapping(id).block().getHost();
我对 Webflux 还很陌生,但上面这行代码对我来说已经是一种代码味道了。进一步阅读后,这不是使用反应式的最佳方法。我尝试以更 functional/reactive 的方式重写,但未能让反应流发出所需的值。
Mono.just(serviceReturnsMono.getServerMapping(id))
.flatMap(flat -> flat)
.subscribeOn(Schedulers.immediate())
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() ))
.subscribe();
执行上述代码时,exchange 属性不会发生变化。
我也尝试了以下方法但没有成功:
serverMappingMono
.map(serverMapping -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + serverMapping.getHost() ))
.subscribe();
作为测试,当我如下修改代码进行故障排除时,下面确实发出了一个硬编码字符串并且交换属性发生了变化。
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + "testHostName" ))
.subscribe();
任何想法或指示将不胜感激。
更新:过滤代码如下:
private ReturnsMonoServerMappingService returnsMonoServerMappingService;
@Override
public int getOrder() {
return 10001;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest().getHeaders().getFirst("reference");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
Mono<ServerMapping> serverMapping = returnsMonoServerMappingService.getServerMapping(id);
serverMapping
.map(server -> exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost() )))
.subscribe();
}));
}
}
来自 Thomas 的更新解决方案:
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest()
.getHeaders()
.getFirst("reference");
return returnsMonoServerMappingService.getServerMapping(id)
.doOnSuccess(serverMapping -> {
exchange.getAttributes()
.put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
}).then(chain.filter(exchange));
}
我忽略的缺失部分是 "doOnSuccess",因为 returnsMonoServerMappingService 已经 returns 一个单声道。然后通过 "then" 链接交换以委托给链中的下一个过滤器。
我正在我的 phone 上写这篇文章,所以无法测试它并且我是凭记忆写的,但我认为它应该是这样的。或者至少你明白了要点。
我们先提取id。然后我们查找服务器映射,如果一切顺利,我们将其作为一个属性,然后我们继续过滤器链。
您几乎不应该在应用程序中订阅,调用客户端通常是订阅者。永远不要阻塞反应性应用程序。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
final String id = exchange.getRequest()
.getHeaders()
.getFirst("reference");
return returnsMonoServerMappingService.getServerMapping(id)
.doOnSuccess(serverMapping -> {
exchange.getAttributes()
.put(GATEWAY_REQUEST_URL_ATTR, URI.create("https://" + server.getHost()
}).then(chain.filter(exchange));
}