反应式编程 - Webflux Webfilter 行为不正常
Reactive Programming - Webflux Webfilter not behaving properly
我对反应式编程有点陌生,我正在尝试 assemble 以下内容:使用 Java、Springboot 2、Webflux 和 reactor core,我想处理需要额外身份验证的非常具体的请求。所以我正在通过一系列步骤实施 WebFilter
:
- 捕获请求的路径和方法。使用
accessPointService.getAccessPointAuthorizationRequirement
方法检查组合是否存在并需要特定身份验证(returns 带有布尔值的 Mono)。
- 因为我配置了 CSRF 和 Spring 安全性,所以我需要 csrf 令牌和 springsession 凭证。我发出 GET 和 POST 凭据请求。
- 然后使用凭据,我只需向可以执行一系列安全检查的服务 (authcheck) 发出 POST 请求(服务正常,在 Postman 和 Angular 中工作正常).
- 之后,我需要检索正文,将其转换为字符串并进行检查。现在这不会发生。
过滤器
@Override
public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {
//client for specific requests.
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
//get request for the CSRF cookie.
WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
.uri("/login");
//post request for the spring security session cookie.
WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
.uri("/login")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
.body(BodyInserters.fromFormData("username", "username")
.with("password", "password"));
//services that checks if the given request needs extra authentication
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
.log()
//gets the csrf token from the GET request
.flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
//combines the previous token with the POST request SESSION cookie,
//THEN secures the last request with both credentials
.zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
AuthenticationFilter::secureAuthRequest)
//gets the exchange from the request and converts the body into a String
.flatMap(AuthenticationFilter::getRequestExchange)
//code to validate if it's doing something. Not implemented yet because it never executes.
.flatMap(s -> Mono.just(s.equals("")))
.onErrorResume(e -> {
throw (CustomException) e;//breaks the execution
})
.then(webFilterChain.filter(serverWebExchange));//continues the execution
}
调用了 secureAuthRequest
和 getRequestExchange
方法
//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
WebClient.RequestHeadersSpec<?> request = webClient.post()
.uri("/authcheck")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
request.header("X-XSRF-TOKEN", csrf);
request.cookies( cookies -> cookies.add( "XSRF-TOKEN", csrf) );
request.header("Authorization", spring);
return request;
}
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
但是当请求绑定认证时,日志如下:
2021-10-26 23:57:18.760 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | request(unbounded)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onNext(true)
2021-10-26 23:57:18.762 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onComplete()
据我所知,数据流以订阅和后续请求开始(我认为 returns 来自 accessPointService.getAccessPointAuthorizationRequirement
方法 Mono 值的 TRUE,如果我错了请纠正我),但 'onComplete()' 日志出现了。我不知道 onComplete() 日志的确切含义,因为它是在执行 getRequestExchange 方法(被调用)之前显示的。 Mono.just(s.equals(""))
段代码永远不会执行。
我已经阅读了很多关于 'nothing happens until you subscribe' 的内容,但我仍然不知道如果我从未明确订阅流,为什么会调用反应流,而且我也不知道如何订阅实施它,因为它只有 returns 一个 Disposable(我想我可以从内部抛出异常?)。另外,我听说在调用多个订阅者时会解耦,所以我尽量避免它们。
任何有关反应式编程、reactor-core 或特定流程以及如何改进它的帮助,我们都将不胜感激。
干杯。
所以经过一些研究并感谢@Toerktumlare 的评论,并弄清楚了发生了什么以及我 changed/applied 对此做了什么。
所以对于'onComplete()
'日志,它标志着数据生产者的结束。因此,要查看操作的完整堆栈,我需要将每个生产者与自己的日志链接起来。例如:
Mono.just(Boolean.FALSE)
.log()
.flatMap(booleanVal -> Mono.just(booleanVal.toString()))
.log()
.subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));
这将为初始生产者和 flatMap 操作生成跟踪。
现在,进入主要问题,问题出在 getRequestExchange
方法中:
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
问题隐藏在 bodyToMono
方法中。根据此站点 https://medium.com/@jeevjyotsinghchhabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9 ,如果对此请求的响应出于某种原因没有正文,则不会抛出任何错误,而只是 return 一个 Mono.empty()
。由于流程没有为这样的制作人准备,所以就到此为止了。
就我而言,问题是 spring 云安全。我在请求中提供了授权凭证,但没有提供关联的 SESSION cookie。所以请求 returned 了一个没有正文的 302 (Found)。这就是问题所在(不是反应流本身)。
所以,在那之后,我修改了请求,@Toerktumlare 的评论帮助我开发了一个可行的解决方案:
//service that returns if certain resource needs authentication or not, or if it's not even configured
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
//if the response is a Mono Empty, then returns a not acceptable exception
.switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
//takes the boolean value to check if extra auth is needed.
.flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
//gets the access token - the extra auth credential
.flatMap(isRequired -> getHeaderToken(serverWebExchange))
//from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
.flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
//gets the CRSF token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
//gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
//does the request and gets the response
.map(requestBodySpecs -> requestBodySpecs.retrieve())
//from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
.flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
//checks the authentication and throws a Unauthorizedstatus if its not valid.
.flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
//if an error is present, then throws it
.onErrorResume(e -> {
if (e instanceof FWKException.GenericException) {
throw (FWKException.GenericException) e;
}
throw (RuntimeException) e;
})
//finally, continues the execution if no exception was thrown.
.then(webFilterChain.filter(serverWebExchange));
我在此解决方案中实现了更多内容(存储 CSRF 和 spring-cloud 凭证以避免不必要的调用)。
我对反应式编程有点陌生,我正在尝试 assemble 以下内容:使用 Java、Springboot 2、Webflux 和 reactor core,我想处理需要额外身份验证的非常具体的请求。所以我正在通过一系列步骤实施 WebFilter
:
- 捕获请求的路径和方法。使用
accessPointService.getAccessPointAuthorizationRequirement
方法检查组合是否存在并需要特定身份验证(returns 带有布尔值的 Mono)。 - 因为我配置了 CSRF 和 Spring 安全性,所以我需要 csrf 令牌和 springsession 凭证。我发出 GET 和 POST 凭据请求。
- 然后使用凭据,我只需向可以执行一系列安全检查的服务 (authcheck) 发出 POST 请求(服务正常,在 Postman 和 Angular 中工作正常).
- 之后,我需要检索正文,将其转换为字符串并进行检查。现在这不会发生。
过滤器
@Override
public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {
//client for specific requests.
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
//get request for the CSRF cookie.
WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
.uri("/login");
//post request for the spring security session cookie.
WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
.uri("/login")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
.body(BodyInserters.fromFormData("username", "username")
.with("password", "password"));
//services that checks if the given request needs extra authentication
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
.log()
//gets the csrf token from the GET request
.flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
//combines the previous token with the POST request SESSION cookie,
//THEN secures the last request with both credentials
.zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
AuthenticationFilter::secureAuthRequest)
//gets the exchange from the request and converts the body into a String
.flatMap(AuthenticationFilter::getRequestExchange)
//code to validate if it's doing something. Not implemented yet because it never executes.
.flatMap(s -> Mono.just(s.equals("")))
.onErrorResume(e -> {
throw (CustomException) e;//breaks the execution
})
.then(webFilterChain.filter(serverWebExchange));//continues the execution
}
调用了 secureAuthRequest
和 getRequestExchange
方法
//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {
WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
WebClient.RequestHeadersSpec<?> request = webClient.post()
.uri("/authcheck")
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
request.header("X-XSRF-TOKEN", csrf);
request.cookies( cookies -> cookies.add( "XSRF-TOKEN", csrf) );
request.header("Authorization", spring);
return request;
}
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
但是当请求绑定认证时,日志如下:
2021-10-26 23:57:18.760 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | request(unbounded)
2021-10-26 23:57:18.761 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onNext(true)
2021-10-26 23:57:18.762 INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4 : | onComplete()
据我所知,数据流以订阅和后续请求开始(我认为 returns 来自 accessPointService.getAccessPointAuthorizationRequirement
方法 Mono 值的 TRUE,如果我错了请纠正我),但 'onComplete()' 日志出现了。我不知道 onComplete() 日志的确切含义,因为它是在执行 getRequestExchange 方法(被调用)之前显示的。 Mono.just(s.equals(""))
段代码永远不会执行。
我已经阅读了很多关于 'nothing happens until you subscribe' 的内容,但我仍然不知道如果我从未明确订阅流,为什么会调用反应流,而且我也不知道如何订阅实施它,因为它只有 returns 一个 Disposable(我想我可以从内部抛出异常?)。另外,我听说在调用多个订阅者时会解耦,所以我尽量避免它们。
任何有关反应式编程、reactor-core 或特定流程以及如何改进它的帮助,我们都将不胜感激。
干杯。
所以经过一些研究并感谢@Toerktumlare 的评论,并弄清楚了发生了什么以及我 changed/applied 对此做了什么。
所以对于'onComplete()
'日志,它标志着数据生产者的结束。因此,要查看操作的完整堆栈,我需要将每个生产者与自己的日志链接起来。例如:
Mono.just(Boolean.FALSE)
.log()
.flatMap(booleanVal -> Mono.just(booleanVal.toString()))
.log()
.subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));
这将为初始生产者和 flatMap 操作生成跟踪。
现在,进入主要问题,问题出在 getRequestExchange
方法中:
//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {
return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
}
问题隐藏在 bodyToMono
方法中。根据此站点 https://medium.com/@jeevjyotsinghchhabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9 ,如果对此请求的响应出于某种原因没有正文,则不会抛出任何错误,而只是 return 一个 Mono.empty()
。由于流程没有为这样的制作人准备,所以就到此为止了。
就我而言,问题是 spring 云安全。我在请求中提供了授权凭证,但没有提供关联的 SESSION cookie。所以请求 returned 了一个没有正文的 302 (Found)。这就是问题所在(不是反应流本身)。
所以,在那之后,我修改了请求,@Toerktumlare 的评论帮助我开发了一个可行的解决方案:
//service that returns if certain resource needs authentication or not, or if it's not even configured
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
//if the response is a Mono Empty, then returns a not acceptable exception
.switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
//takes the boolean value to check if extra auth is needed.
.flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
//gets the access token - the extra auth credential
.flatMap(isRequired -> getHeaderToken(serverWebExchange))
//from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
.flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
//gets the CRSF token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
//gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
.zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
//does the request and gets the response
.map(requestBodySpecs -> requestBodySpecs.retrieve())
//from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
.flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
//checks the authentication and throws a Unauthorizedstatus if its not valid.
.flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
//if an error is present, then throws it
.onErrorResume(e -> {
if (e instanceof FWKException.GenericException) {
throw (FWKException.GenericException) e;
}
throw (RuntimeException) e;
})
//finally, continues the execution if no exception was thrown.
.then(webFilterChain.filter(serverWebExchange));
我在此解决方案中实现了更多内容(存储 CSRF 和 spring-cloud 凭证以避免不必要的调用)。