在 Spring 云网关过滤器中提取 WebClient GET 响应值
Extract WebClient GET response values within a Spring Cloud Gateway filter
我的最终目标是实现一种在网关路由过滤器主体内进行复合 API 调用的方法。我在端口 9000
上有一个非常基本的演示应用程序 运行 并公开了一些端点。这是 REST 控制器:
@RestController
@RequestMapping("/composite")
public class CompositeCallController {
@GetMapping("/test/one")
public Map<String, Object> first() {
Map<String, Object> output = new HashMap<>();
output.put("response-1-1", "FIRST 1");
output.put("response-1-2", "FIRST 2");
output.put("response-1-3", "FIRST 3");
return output;
}
@GetMapping("/test/two")
public Map<String, Object> second() {
Map<String, Object> output = new HashMap<>();
output.put("response-2-1", "SECOND 1");
output.put("response-2-2", "SECOND 2");
output.put("response-2-3", "SECOND 3");
return output;
}
@GetMapping
public Map<String, Object> init() {
return new HashMap<>();
}
}
两个控制器 return 只是一个简单的地图,里面有一些条目。我在单独的端口上有一个 Spring 云网关应用程序 运行,并且我通过 YML 配置了一条通往 localhost:9000/composite
端点的路由,return 是一张空白地图。然后我有一个 ModifyResponseBodyGatewayFilterFactory
过滤器,它启动并创建两个全新的请求,指向我的演示应用程序中的其他两个端点。
我想将这两个响应聚合为一个,方法是将它们传输到我 return 到过滤器链的新映射中。这是我的过滤器的外观:
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(FIRST_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(SECOND_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
Mono.zip(firstCallMono, secondCallMono)
.log()
.subscribe(v -> {
System.out.println("FIRST VALUE = " + v.getT1());
System.out.println("SECOND VALUE = " + v.getT2());
output.put("1", v.getT1());
output.put("2", v.getT2());
});
System.out.println("OUTPUT VALUE 1 = " + output.get("1"));
System.out.println("OUTPUT VALUE 2 = " + output.get("2"));
return Mono.just(output);
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
json
类型定义为private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {};
URI 如下:
public static final String FIRST_SERVICE_URL = "http://localhost:9000/composite/test/one";
public static final String SECOND_SERVICE_URL = "http://localhost:9000/composite/test/two";
这是我的网关配置供参考:
logging:
level:
reactor:
netty: INFO
org:
springframework:
cloud:
gateway: TRACE
spring:
codec:
max-in-memory-size: 20MB
cloud:
gateway:
httpclient:
wiretap: true
httpserver:
wiretap: true
routes:
- id: composite-call-test
uri: http://localhost:9000
predicates:
- Path=/composite/**
filters:
- CompositeApiCallFilter
为了合并 Monos,我使用了 Mono.zip()
,因为它似乎正好达到了这个目的。我特意在 zip()
正文中放置了两个 System.out.println()
,以确保来自上述两个 WebClient 请求的响应实际上是正确的,看起来确实如此:
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
但是,我还在 zip()
之后放置了两个控制台打印,以检查地图中是否填充了某些内容,但由于某种原因它完全是空的:
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
这是请求的完整控制台输出以供参考:
2022-05-13 14:53:22.087 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onSubscribe([Fuseable] MonoZip.ZipCoordinator)
2022-05-13 14:53:22.090 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : request(unbounded)
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
2022-05-13 14:53:22.139 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onNext([{response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1},{response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}])
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
2022-05-13 14:53:22.140 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onComplete()
我尝试了很多其他方法来完成上述操作,例如通过使用 firstCallMono.mergeWith(secondCallMono)
将两个 Mono
合并为一个 Flux
,然后订阅生成的 Flux
对象并填充地图,但结果是相同的。
我还尝试将两个 Mono
放入一个 Pair
对象中并像这样提取值:
Pair<Mono<Map<String, Object>>, Mono<Map<String, Object>>> pair = new Pair(firstCall, secondCallDTOMono);
pair.getValue0().log().subscribe(v -> output.put("1", v));
pair.getValue1().log().subscribe(v -> output.put("2", v));
但是,output
地图最后还是空的,我不明白为什么。似乎从 WebClient .get()
调用返回的任何内容都是 MonoFlapMap.FlatMapMain
类型,我怀疑问题来自将此类型的值解包到我的常规 HashMap 中,但我不知道如何解决那个问题。我尝试使用 .map()
和 .flatMap()
但均无效。
有人可以告诉我如何提取这些值吗?
多亏了Toerktumlare的指点,我才能让它正常工作。这是完整的过滤器供参考:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Component
public class CompositeApiCallFilter extends AbstractGatewayFilterFactory<CompositeApiCallFilter.Config> {
public static final String COMPOSITE_TEST_URL = "http://localhost:9000/composite/test/";
private final ModifyResponseBodyGatewayFilterFactory modifyResponseBodyFilterFactory;
private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {
};
@Autowired
public CompositeApiCallFilter(ModifyResponseBodyGatewayFilterFactory factory) {
super(Config.class);
this.modifyResponseBodyFilterFactory = factory;
}
@Override
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "one")
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "two")
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
return Mono.zip(firstCallMono, secondCallMono)
.flatMap(v -> {
output.put("1", v.getT1());
output.put("2", v.getT2());
return Mono.just(output);
});
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
public static class Config {
}
}
而在 Postman 中的相应输出:
{
"1": {
"response-1-2": "FIRST 2",
"response-1-3": "FIRST 3",
"response-1-1": "FIRST 1"
},
"2": {
"response-2-3": "SECOND 3",
"response-2-1": "SECOND 1",
"response-2-2": "SECOND 2"
}
}
似乎根本不需要订阅,只需压缩单声道并使用 flatMap
提取它们的值就可以了。
我的最终目标是实现一种在网关路由过滤器主体内进行复合 API 调用的方法。我在端口 9000
上有一个非常基本的演示应用程序 运行 并公开了一些端点。这是 REST 控制器:
@RestController
@RequestMapping("/composite")
public class CompositeCallController {
@GetMapping("/test/one")
public Map<String, Object> first() {
Map<String, Object> output = new HashMap<>();
output.put("response-1-1", "FIRST 1");
output.put("response-1-2", "FIRST 2");
output.put("response-1-3", "FIRST 3");
return output;
}
@GetMapping("/test/two")
public Map<String, Object> second() {
Map<String, Object> output = new HashMap<>();
output.put("response-2-1", "SECOND 1");
output.put("response-2-2", "SECOND 2");
output.put("response-2-3", "SECOND 3");
return output;
}
@GetMapping
public Map<String, Object> init() {
return new HashMap<>();
}
}
两个控制器 return 只是一个简单的地图,里面有一些条目。我在单独的端口上有一个 Spring 云网关应用程序 运行,并且我通过 YML 配置了一条通往 localhost:9000/composite
端点的路由,return 是一张空白地图。然后我有一个 ModifyResponseBodyGatewayFilterFactory
过滤器,它启动并创建两个全新的请求,指向我的演示应用程序中的其他两个端点。
我想将这两个响应聚合为一个,方法是将它们传输到我 return 到过滤器链的新映射中。这是我的过滤器的外观:
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(FIRST_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(SECOND_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
Mono.zip(firstCallMono, secondCallMono)
.log()
.subscribe(v -> {
System.out.println("FIRST VALUE = " + v.getT1());
System.out.println("SECOND VALUE = " + v.getT2());
output.put("1", v.getT1());
output.put("2", v.getT2());
});
System.out.println("OUTPUT VALUE 1 = " + output.get("1"));
System.out.println("OUTPUT VALUE 2 = " + output.get("2"));
return Mono.just(output);
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
json
类型定义为private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {};
URI 如下:
public static final String FIRST_SERVICE_URL = "http://localhost:9000/composite/test/one";
public static final String SECOND_SERVICE_URL = "http://localhost:9000/composite/test/two";
这是我的网关配置供参考:
logging:
level:
reactor:
netty: INFO
org:
springframework:
cloud:
gateway: TRACE
spring:
codec:
max-in-memory-size: 20MB
cloud:
gateway:
httpclient:
wiretap: true
httpserver:
wiretap: true
routes:
- id: composite-call-test
uri: http://localhost:9000
predicates:
- Path=/composite/**
filters:
- CompositeApiCallFilter
为了合并 Monos,我使用了 Mono.zip()
,因为它似乎正好达到了这个目的。我特意在 zip()
正文中放置了两个 System.out.println()
,以确保来自上述两个 WebClient 请求的响应实际上是正确的,看起来确实如此:
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
但是,我还在 zip()
之后放置了两个控制台打印,以检查地图中是否填充了某些内容,但由于某种原因它完全是空的:
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
这是请求的完整控制台输出以供参考:
2022-05-13 14:53:22.087 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onSubscribe([Fuseable] MonoZip.ZipCoordinator)
2022-05-13 14:53:22.090 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : request(unbounded)
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
2022-05-13 14:53:22.139 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onNext([{response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1},{response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}])
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
2022-05-13 14:53:22.140 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onComplete()
我尝试了很多其他方法来完成上述操作,例如通过使用 firstCallMono.mergeWith(secondCallMono)
将两个 Mono
合并为一个 Flux
,然后订阅生成的 Flux
对象并填充地图,但结果是相同的。
我还尝试将两个 Mono
放入一个 Pair
对象中并像这样提取值:
Pair<Mono<Map<String, Object>>, Mono<Map<String, Object>>> pair = new Pair(firstCall, secondCallDTOMono);
pair.getValue0().log().subscribe(v -> output.put("1", v));
pair.getValue1().log().subscribe(v -> output.put("2", v));
但是,output
地图最后还是空的,我不明白为什么。似乎从 WebClient .get()
调用返回的任何内容都是 MonoFlapMap.FlatMapMain
类型,我怀疑问题来自将此类型的值解包到我的常规 HashMap 中,但我不知道如何解决那个问题。我尝试使用 .map()
和 .flatMap()
但均无效。
有人可以告诉我如何提取这些值吗?
多亏了Toerktumlare的指点,我才能让它正常工作。这是完整的过滤器供参考:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Component
public class CompositeApiCallFilter extends AbstractGatewayFilterFactory<CompositeApiCallFilter.Config> {
public static final String COMPOSITE_TEST_URL = "http://localhost:9000/composite/test/";
private final ModifyResponseBodyGatewayFilterFactory modifyResponseBodyFilterFactory;
private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {
};
@Autowired
public CompositeApiCallFilter(ModifyResponseBodyGatewayFilterFactory factory) {
super(Config.class);
this.modifyResponseBodyFilterFactory = factory;
}
@Override
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "one")
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "two")
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
return Mono.zip(firstCallMono, secondCallMono)
.flatMap(v -> {
output.put("1", v.getT1());
output.put("2", v.getT2());
return Mono.just(output);
});
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
public static class Config {
}
}
而在 Postman 中的相应输出:
{
"1": {
"response-1-2": "FIRST 2",
"response-1-3": "FIRST 3",
"response-1-1": "FIRST 1"
},
"2": {
"response-2-3": "SECOND 3",
"response-2-1": "SECOND 1",
"response-2-2": "SECOND 2"
}
}
似乎根本不需要订阅,只需压缩单声道并使用 flatMap
提取它们的值就可以了。