在 Spring 云网关过滤器中提取 WebClient GET 响应值

Extract WebClient GET response values within a Spring Cloud Gateway filter

我的最终目标是实现一种在网关路由过滤器主体内进行复合 API 调用的方法。我在端口 9000 上有一个非常基本的演示应用程序 运行 并公开了一些端点。这是 REST 控制器:

@RestController
@RequestMapping("/composite")
public class CompositeCallController {

    @GetMapping("/test/one")
    public Map<String, Object> first() {
        Map<String, Object> output = new HashMap<>();
        output.put("response-1-1", "FIRST 1");
        output.put("response-1-2", "FIRST 2");
        output.put("response-1-3", "FIRST 3");
        return output;
    }

    @GetMapping("/test/two")
    public Map<String, Object> second() {
        Map<String, Object> output = new HashMap<>();
        output.put("response-2-1", "SECOND 1");
        output.put("response-2-2", "SECOND 2");
        output.put("response-2-3", "SECOND 3");
        return output;
    }

    @GetMapping
    public Map<String, Object> init() {
        return new HashMap<>();
    }
}

两个控制器 return 只是一个简单的地图,里面有一些条目。我在单独的端口上有一个 Spring 云网关应用程序 运行,并且我通过 YML 配置了一条通往 localhost:9000/composite 端点的路由,return 是一张空白地图。然后我有一个 ModifyResponseBodyGatewayFilterFactory 过滤器,它启动并创建两个全新的请求,指向我的演示应用程序中的其他两个端点。

我想将这两个响应聚合为一个,方法是将它们传输到我 return 到过滤器链的新映射中。这是我的过滤器的外观:

    public GatewayFilter apply(final Config config) {
        final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
        modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
            WebClient client = WebClient.create();

            Mono<Map<String, Object>> firstCallMono = client.get()
                    .uri(FIRST_SERVICE_URL)
                    .retrieve()
                    .bodyToMono(json);

            Mono<Map<String, Object>> secondCallMono = client.get()
                    .uri(SECOND_SERVICE_URL)
                    .retrieve()
                    .bodyToMono(json);

            Map<String, Object> output = new HashMap<>();
            Mono.zip(firstCallMono, secondCallMono)
                    .log()
                    .subscribe(v -> {
                        System.out.println("FIRST VALUE = " + v.getT1());
                        System.out.println("SECOND VALUE = " + v.getT2());
                        output.put("1", v.getT1());
                        output.put("2", v.getT2());
                    });

            System.out.println("OUTPUT VALUE 1 = " + output.get("1"));
            System.out.println("OUTPUT VALUE 2 = " + output.get("2"));

            return Mono.just(output);
        });
        return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
    }

json类型定义为private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {};

URI 如下:

public static final String FIRST_SERVICE_URL = "http://localhost:9000/composite/test/one";
public static final String SECOND_SERVICE_URL = "http://localhost:9000/composite/test/two";

这是我的网关配置供参考:

logging:
  level:
    reactor:
      netty: INFO
    org:
      springframework:
        cloud:
          gateway: TRACE

spring:
  codec:
    max-in-memory-size: 20MB
  cloud:
    gateway:
      httpclient:
        wiretap: true
      httpserver:
        wiretap: true
      routes:
        - id: composite-call-test
          uri: http://localhost:9000
          predicates:
            - Path=/composite/**
          filters:
            - CompositeApiCallFilter

为了合并 Monos,我使用了 Mono.zip(),因为它似乎正好达到了这个目的。我特意在 zip() 正文中放置了两个 System.out.println(),以确保来自上述两个 WebClient 请求的响应实际上是正确的,看起来确实如此:

FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}

但是,我还在 zip() 之后放置了两个控制台打印,以检查地图中是否填充了某些内容,但由于某种原因它完全是空的:

OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null

这是请求的完整控制台输出以供参考:

2022-05-13 14:53:22.087  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onSubscribe([Fuseable] MonoZip.ZipCoordinator)
2022-05-13 14:53:22.090  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : request(unbounded)
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
2022-05-13 14:53:22.139  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onNext([{response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1},{response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}])
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
2022-05-13 14:53:22.140  INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1                       : onComplete()

我尝试了很多其他方法来完成上述操作,例如通过使用 firstCallMono.mergeWith(secondCallMono) 将两个 Mono 合并为一个 Flux,然后订阅生成的 Flux 对象并填充地图,但结果是相同的。

我还尝试将两个 Mono 放入一个 Pair 对象中并像这样提取值:

Pair<Mono<Map<String, Object>>, Mono<Map<String, Object>>> pair = new Pair(firstCall, secondCallDTOMono);
pair.getValue0().log().subscribe(v -> output.put("1", v));
pair.getValue1().log().subscribe(v -> output.put("2", v));

但是,output 地图最后还是空的,我不明白为什么。似乎从 WebClient .get() 调用返回的任何内容都是 MonoFlapMap.FlatMapMain 类型,我怀疑问题来自将此类型的值解包到我的常规 HashMap 中,但我不知道如何解决那个问题。我尝试使用 .map().flatMap() 但均无效。

有人可以告诉我如何提取这些值吗?

多亏了Toerktumlare的指点,我才能让它正常工作。这是完整的过滤器供参考:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@Component
public class CompositeApiCallFilter extends AbstractGatewayFilterFactory<CompositeApiCallFilter.Config> {
    public static final String COMPOSITE_TEST_URL = "http://localhost:9000/composite/test/";
    private final ModifyResponseBodyGatewayFilterFactory modifyResponseBodyFilterFactory;
    private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {
    };

    @Autowired
    public CompositeApiCallFilter(ModifyResponseBodyGatewayFilterFactory factory) {
        super(Config.class);
        this.modifyResponseBodyFilterFactory = factory;
    }

    @Override
    public GatewayFilter apply(final Config config) {
        final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
        modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
            WebClient client = WebClient.create();

            Mono<Map<String, Object>> firstCallMono = client.get()
                    .uri(COMPOSITE_TEST_URL + "one")
                    .retrieve()
                    .bodyToMono(json);

            Mono<Map<String, Object>> secondCallMono = client.get()
                    .uri(COMPOSITE_TEST_URL + "two")
                    .retrieve()
                    .bodyToMono(json);

            Map<String, Object> output = new HashMap<>();

            return Mono.zip(firstCallMono, secondCallMono)
                    .flatMap(v -> {
                        output.put("1", v.getT1());
                        output.put("2", v.getT2());
                        return Mono.just(output);
                    });
        });

        return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
    }

    public static class Config {
    }
}

而在 Postman 中的相应输出:

{
    "1": {
        "response-1-2": "FIRST 2",
        "response-1-3": "FIRST 3",
        "response-1-1": "FIRST 1"
    },
    "2": {
        "response-2-3": "SECOND 3",
        "response-2-1": "SECOND 1",
        "response-2-2": "SECOND 2"
    }
}

似乎根本不需要订阅,只需压缩单声道并使用 flatMap 提取它们的值就可以了。