Spring 网关 AsyncPredicate 不适用于反应器和通量
Spring Gateway AsyncPredicate not working with reactor and flux
我们已经为 Spring-Gateway 编写了自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的主体,然后根据主体中存在的特定方法导出路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return exchange -> {
Class<String> inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error("Predicate test failed because String.class does not match the cached body object", e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
}
};
}
旧版本的 Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)
可以完美地运行此解决方案。但是使用最新版本的 Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6)
我得到以下异常。网关应用正常启动,没有报错
Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter[=12=](RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more
有其他人也遇到同样的问题并且知道如何解决吗?
问题是,the greenwich version of those apis was beta。现在 CACHED_REQUEST_BODY_ATTR
中预期的对象必须是 PooledDataBuffer
。所以我现在相应地更改了我的代码。现在看起来如下所示:
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
更新 class 后,它现在可以正常工作了。
我们已经为 Spring-Gateway 编写了自定义 Predicate 工厂来路由请求。我们正在解析 XML 请求的主体,然后根据主体中存在的特定方法导出路由。在执行此操作时,我们编写了以下代码来创建 ServerRquest。
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return exchange -> {
Class<String> inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error("Predicate test failed because String.class does not match the cached body object", e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
}
};
}
旧版本的 Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)
可以完美地运行此解决方案。但是使用最新版本的 Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6)
我得到以下异常。网关应用正常启动,没有报错
Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter[=12=](RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more
有其他人也遇到同样的问题并且知道如何解决吗?
问题是,the greenwich version of those apis was beta。现在 CACHED_REQUEST_BODY_ATTR
中预期的对象必须是 PooledDataBuffer
。所以我现在相应地更改了我的代码。现在看起来如下所示:
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
更新 class 后,它现在可以正常工作了。