Spring 网关 AsyncPredicate 不适用于反应器和通量

Spring Gateway AsyncPredicate not working with reactor and flux

我们已经为 Spring-Gateway 编写了自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的主体,然后根据主体中存在的特定方法导出路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
        return exchange -> {
            Class<String> inClass = String.class;

            Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

            if (cachedBody != null) {
                try {
                    boolean test = config.pattern.matcher((String) cachedBody).matches();
                    exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                    return Mono.just(test);
                } catch (ClassCastException e) {
                    LOG.error("Predicate test failed because String.class does not match the cached body object", e);
                }
                return Mono.just(false);
            } else {

                return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

            }
        };
    }

旧版本的 Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE) 可以完美地运行此解决方案。但是使用最新版本的 Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6) 我得到以下异常。网关应用正常启动,没有报错

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter[=12=](RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

有其他人也遇到同样的问题并且知道如何解决吗?

问题是,the greenwich version of those apis was beta。现在 CACHED_REQUEST_BODY_ATTR 中预期的对象必须是 PooledDataBuffer。所以我现在相应地更改了我的代码。现在看起来如下所示:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                    DataBufferUtils.retain(dataBuffer);

                    Flux<DataBuffer> cachedFlux = Flux
                            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                    PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
                    
                    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                        @Override
                        public Flux<DataBuffer> getBody() {
                            return cachedFlux;
                        }
                    };
                    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                            .bodyToMono(inClass).doOnNext(objectValue -> {
                                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
                            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
                });

更新 class 后,它现在可以正常工作了。