WebFlux 应用程序中的 WebFilter
WebFilter in WebFlux application
我有一个 Spring 使用 Spring Boot 2.0.0.M5/2.0.0.BUILD-SNAPSHOT 的 Boot WebFlux 应用程序。
我需要向所有日志添加 trace-id。
为了让它在 WebFlux 应用程序中工作,我尝试使用 here and here
中描述的 WebFilter 方法
@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange).subscriberContext((Context context) ->
context.put(AuditContext.class, getAuditContext(exchange.getRequest().getHeaders()))
);
}
我的控制器
@GetMapping(value = "/some_mapping")
public Mono<ResponseEntity<WrappedResponse>> getResource(@PathVariable("resourceId") String id) {
Mono.subscriberContext().flatMap(context -> {
AuditContext auditContext = context.get(AuditContext.class);
...
});
我遇到的问题是过滤器方法从未执行过,并且未设置上下文。我已经确认启动时加载了 Webfilter。
是否需要其他任何东西才能使过滤器正常工作?
原来这不起作用的原因是因为我对 spring-boot-starter-web 和 spring-boot-starter-webflux 都有依赖。
compile("org.springframework.boot:spring-boot-starter-web")
compile("org.springframework.boot:spring-boot-starter-webflux")
我添加 spring-boot-starter-web 的原因是因为当我删除依赖项时出现以下异常
Caused by: java.io.FileNotFoundException: class path resource [org/springframework/web/servlet/mvc/method/annotation/ResponseEntityExceptionHandler.class] cannot be opened because it does not exist
at org.springframework.core.io.ClassPathResource.getInputStream(ClassPathResource.java:177) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at org.springframework.core.type.classreading.SimpleMetadataReader.<init>(SimpleMetadataReader.java:51) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at org.springframework.core.type.classreading.SimpleMetadataReaderFactory.getMetadataReader(SimpleMetadataReaderFactory.java:99) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
我发现我收到此错误的原因是因为我有一个自定义启动器,其配置 class 在 EnableAutoConfiguration
中
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.x.y.z.MyConfiguration
此配置 class 在组件扫描期间也被选中,这似乎导致了一些问题。
删除对 spring-boot-starter-web 的依赖后,WebFilter 开始工作。
我在解决这个问题时遇到了很多问题,希望它能对某人有所帮助。我的用例是验证请求的签名。这要求我解析 PUT/POST 的请求正文。我看到的另一个主要用例是日志记录,所以下面的内容也会有所帮助。
MiddlewareAuthenticator.java
@Component
public class MiddlewareAuthenticator implements WebFilter {
@Autowired private RequestValidationService requestValidationService;
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain chain) {
return HEALTH_ENDPOINTS
.matches(serverWebExchange)
.flatMap(
matches -> {
if (matches.isMatch()) {
return chain.filter(serverWebExchange);
} else {
return requestValidationService
.validate(serverWebExchange,
new BiPredicate<ServerWebExchange, String> {
@Override
public boolean test(ServerWebExchange e, String body) {
/** application logic can go here. few points:
1. I used a BiPredicate because I just need a true or false if the request should be passed to the controller.
2. If you want todo other mutations you could swap the predicate to a normal function and return a mutated ServerWebExchange.
3. I pass body separately here to ensure safety of accessing the request body and not having to rewrap the ServerWebExchange. A side affect of this though is any mutations to the String body do not affect downstream.
**/
return true;
}
})
.flatMap((ServerWebExchange r) -> chain.filter(r));
}});
}
RequestValidationService.java
@Service
public class RequestValidationService {
private DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
NettyDataBufferFactory nettyDataBufferFactory =
new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
private String bodyToString(InputStream bodyBytes) {
byte[] currArr = null;
try {
currArr = bodyBytes.readAllBytes();
bodyBytes.read(currArr);
} catch (IOException ioe) {
throw new RuntimeException("could not parse body");
}
if (currArr.length == 0) {
return null;
}
return new String(currArr, StandardCharsets.UTF_8);
}
private ServerHttpRequestDecorator requestWrapper(ServerHttpRequest request, String bodyStr) {
URI uri = request.getURI();
ServerHttpRequest newRequest = request.mutate().uri(uri).build();
final DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> newBodyFlux = Flux.just(bodyDataBuffer);
ServerHttpRequestDecorator requestDecorator =
new ServerHttpRequestDecorator(newRequest) {
@Override
public Flux<DataBuffer> getBody() {
return newBodyFlux;
}
};
return requestDecorator;
}
private InputStream newInputStream() {
return new InputStream() {
public int read() {
return -1;
}
};
}
private InputStream processRequestBody(InputStream s, DataBuffer d) {
SequenceInputStream seq = new SequenceInputStream(s, d.asInputStream());
return seq;
}
private Mono<ServerWebExchange> processInputStream(
InputStream aggregatedBodyBytes,
ServerWebExchange exchange,
BiPredicate<ServerHttpRequest, String> predicate) {
ServerHttpRequest request = exchange.getRequest();
HttpHeaders headers = request.getHeaders();
String bodyStr = bodyToString(aggregatedBodyBytes);
ServerWebExchange mutatedExchange = exchange;
// if the body exists on the request we need to mutate the ServerWebExchange to not
// reparse the body because DataBuffers can only be read once;
if (bodyStr != null) {
mutatedExchange = exchange.mutate().request(requestWrapper(request, bodyStr)).build();
}
ServerHttpRequest mutatedRequest = mutatedExchange.getRequest();
if (predicate.test(mutatedRequest, bodyStr)) {
return Mono.just(mutatedExchange);
}
return Mono.error(new RuntimeException("invalid signature"));
}
/*
* Because the DataBuffer is in a Flux we must reduce it to a Mono type via Flux.reduce
* This covers large payloads or requests bodies that get sent in multiple byte chunks
* and need to be concatentated.
*
* 1. The reduce is initialized with a newInputStream
* 2. processRequestBody is called on each step of the Flux where a step is a body byte
* chunk. The method processRequestBody casts the Inbound DataBuffer to a InputStream
* and concats the new InputStream with the existing one
* 3. Once the Flux is complete flatMap is executed with the resulting InputStream which is
* passed with the ServerWebExchange to processInputStream which will do the request validation
*/
public Mono<ServerWebExchange> validate(
ServerWebExchange exchange, BiPredicate<ServerHttpRequest, String> p) {
Flux<DataBuffer> body = exchange.getRequest().getBody();
return body.reduce(newInputStream(), this::processRequestBody)
.flatMap((InputStream b) -> processInputStream(b, exchange, p));
}
}
BiPredicate 文档:https://docs.oracle.com/javase/8/docs/api/java/util/function/BiPredicate.html
我有类似的问题,我对两者都有依赖性
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
和
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
我们必须在 spring-boot-starter-web 中为 tomcat 和 netty 发布排除。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
我有一个 Spring 使用 Spring Boot 2.0.0.M5/2.0.0.BUILD-SNAPSHOT 的 Boot WebFlux 应用程序。 我需要向所有日志添加 trace-id。
为了让它在 WebFlux 应用程序中工作,我尝试使用 here and here
中描述的 WebFilter 方法@Component
public class TraceIdFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange).subscriberContext((Context context) ->
context.put(AuditContext.class, getAuditContext(exchange.getRequest().getHeaders()))
);
}
我的控制器
@GetMapping(value = "/some_mapping")
public Mono<ResponseEntity<WrappedResponse>> getResource(@PathVariable("resourceId") String id) {
Mono.subscriberContext().flatMap(context -> {
AuditContext auditContext = context.get(AuditContext.class);
...
});
我遇到的问题是过滤器方法从未执行过,并且未设置上下文。我已经确认启动时加载了 Webfilter。 是否需要其他任何东西才能使过滤器正常工作?
原来这不起作用的原因是因为我对 spring-boot-starter-web 和 spring-boot-starter-webflux 都有依赖。
compile("org.springframework.boot:spring-boot-starter-web")
compile("org.springframework.boot:spring-boot-starter-webflux")
我添加 spring-boot-starter-web 的原因是因为当我删除依赖项时出现以下异常
Caused by: java.io.FileNotFoundException: class path resource [org/springframework/web/servlet/mvc/method/annotation/ResponseEntityExceptionHandler.class] cannot be opened because it does not exist
at org.springframework.core.io.ClassPathResource.getInputStream(ClassPathResource.java:177) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at org.springframework.core.type.classreading.SimpleMetadataReader.<init>(SimpleMetadataReader.java:51) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
at org.springframework.core.type.classreading.SimpleMetadataReaderFactory.getMetadataReader(SimpleMetadataReaderFactory.java:99) ~[spring-core-5.0.0.RELEASE.jar:5.0.0.RELEASE]
我发现我收到此错误的原因是因为我有一个自定义启动器,其配置 class 在 EnableAutoConfiguration
中org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.x.y.z.MyConfiguration
此配置 class 在组件扫描期间也被选中,这似乎导致了一些问题。 删除对 spring-boot-starter-web 的依赖后,WebFilter 开始工作。
我在解决这个问题时遇到了很多问题,希望它能对某人有所帮助。我的用例是验证请求的签名。这要求我解析 PUT/POST 的请求正文。我看到的另一个主要用例是日志记录,所以下面的内容也会有所帮助。
MiddlewareAuthenticator.java
@Component
public class MiddlewareAuthenticator implements WebFilter {
@Autowired private RequestValidationService requestValidationService;
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain chain) {
return HEALTH_ENDPOINTS
.matches(serverWebExchange)
.flatMap(
matches -> {
if (matches.isMatch()) {
return chain.filter(serverWebExchange);
} else {
return requestValidationService
.validate(serverWebExchange,
new BiPredicate<ServerWebExchange, String> {
@Override
public boolean test(ServerWebExchange e, String body) {
/** application logic can go here. few points:
1. I used a BiPredicate because I just need a true or false if the request should be passed to the controller.
2. If you want todo other mutations you could swap the predicate to a normal function and return a mutated ServerWebExchange.
3. I pass body separately here to ensure safety of accessing the request body and not having to rewrap the ServerWebExchange. A side affect of this though is any mutations to the String body do not affect downstream.
**/
return true;
}
})
.flatMap((ServerWebExchange r) -> chain.filter(r));
}});
}
RequestValidationService.java
@Service
public class RequestValidationService {
private DataBuffer stringBuffer(String value) {
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
NettyDataBufferFactory nettyDataBufferFactory =
new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);
buffer.write(bytes);
return buffer;
}
private String bodyToString(InputStream bodyBytes) {
byte[] currArr = null;
try {
currArr = bodyBytes.readAllBytes();
bodyBytes.read(currArr);
} catch (IOException ioe) {
throw new RuntimeException("could not parse body");
}
if (currArr.length == 0) {
return null;
}
return new String(currArr, StandardCharsets.UTF_8);
}
private ServerHttpRequestDecorator requestWrapper(ServerHttpRequest request, String bodyStr) {
URI uri = request.getURI();
ServerHttpRequest newRequest = request.mutate().uri(uri).build();
final DataBuffer bodyDataBuffer = stringBuffer(bodyStr);
Flux<DataBuffer> newBodyFlux = Flux.just(bodyDataBuffer);
ServerHttpRequestDecorator requestDecorator =
new ServerHttpRequestDecorator(newRequest) {
@Override
public Flux<DataBuffer> getBody() {
return newBodyFlux;
}
};
return requestDecorator;
}
private InputStream newInputStream() {
return new InputStream() {
public int read() {
return -1;
}
};
}
private InputStream processRequestBody(InputStream s, DataBuffer d) {
SequenceInputStream seq = new SequenceInputStream(s, d.asInputStream());
return seq;
}
private Mono<ServerWebExchange> processInputStream(
InputStream aggregatedBodyBytes,
ServerWebExchange exchange,
BiPredicate<ServerHttpRequest, String> predicate) {
ServerHttpRequest request = exchange.getRequest();
HttpHeaders headers = request.getHeaders();
String bodyStr = bodyToString(aggregatedBodyBytes);
ServerWebExchange mutatedExchange = exchange;
// if the body exists on the request we need to mutate the ServerWebExchange to not
// reparse the body because DataBuffers can only be read once;
if (bodyStr != null) {
mutatedExchange = exchange.mutate().request(requestWrapper(request, bodyStr)).build();
}
ServerHttpRequest mutatedRequest = mutatedExchange.getRequest();
if (predicate.test(mutatedRequest, bodyStr)) {
return Mono.just(mutatedExchange);
}
return Mono.error(new RuntimeException("invalid signature"));
}
/*
* Because the DataBuffer is in a Flux we must reduce it to a Mono type via Flux.reduce
* This covers large payloads or requests bodies that get sent in multiple byte chunks
* and need to be concatentated.
*
* 1. The reduce is initialized with a newInputStream
* 2. processRequestBody is called on each step of the Flux where a step is a body byte
* chunk. The method processRequestBody casts the Inbound DataBuffer to a InputStream
* and concats the new InputStream with the existing one
* 3. Once the Flux is complete flatMap is executed with the resulting InputStream which is
* passed with the ServerWebExchange to processInputStream which will do the request validation
*/
public Mono<ServerWebExchange> validate(
ServerWebExchange exchange, BiPredicate<ServerHttpRequest, String> p) {
Flux<DataBuffer> body = exchange.getRequest().getBody();
return body.reduce(newInputStream(), this::processRequestBody)
.flatMap((InputStream b) -> processInputStream(b, exchange, p));
}
}
BiPredicate 文档:https://docs.oracle.com/javase/8/docs/api/java/util/function/BiPredicate.html
我有类似的问题,我对两者都有依赖性
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
和
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
我们必须在 spring-boot-starter-web 中为 tomcat 和 netty 发布排除。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>