反应式编程 - Webflux Webfilter 行为不正常

Reactive Programming - Webflux Webfilter not behaving properly

我对反应式编程有点陌生,我正在尝试 assemble 以下内容:使用 Java、Springboot 2、Webflux 和 reactor core,我想处理需要额外身份验证的非常具体的请求。所以我正在通过一系列步骤实施 WebFilter

过滤器

@Override
    public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {

        //client for specific requests.
        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        //get request for the CSRF cookie.
        WebClient.RequestHeadersSpec<?> getRequest = webClient.get()
                .uri("/login");
        //post request for the spring security session cookie.
        WebClient.RequestHeadersSpec<?> postRequest = webClient.post()
                .uri("/login")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_FORM_URLENCODED_VALUE)
                .body(BodyInserters.fromFormData("username", "username")
                        .with("password", "password"));
                        //services that checks if the given request needs extra authentication
        return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), serverWebExchange.getRequest().getPath().toString())
                .log()
                //gets the csrf token from the GET request
                .flatMap(isRequired -> getRequest.exchangeToMono(response -> Mono.just(response.cookies().getFirst("XSRF-TOKEN").getValue())))
                //combines the previous token with the POST request SESSION cookie,
                //THEN secures the last request with both credentials
                .zipWith(postRequest.exchangeToMono(resp -> Mono.just(resp.cookies().getFirst("SESSION").getValue())),
                        AuthenticationFilter::secureAuthRequest)
                //gets the exchange from the request and converts the body into a String
                .flatMap(AuthenticationFilter::getRequestExchange)
                //code to validate if it's doing something. Not implemented yet because it never executes.
                .flatMap(s -> Mono.just(s.equals("")))
                .onErrorResume(e -> {
                    throw (CustomException) e;//breaks the execution
                })
                .then(webFilterChain.filter(serverWebExchange));//continues the execution
    }

调用了 secureAuthRequestgetRequestExchange 方法

//adds the springsession cookie and csrf cookie to the request
private static WebClient.RequestHeadersSpec<?> secureAuthRequest(String csrf, String spring) {

        WebClient webClient = WebClient.builder()
                .baseUrl("http://localhost:8080")
                .build();
        WebClient.RequestHeadersSpec<?> request = webClient.post()
                .uri("/authcheck")
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
        request.header("X-XSRF-TOKEN", csrf);
        request.cookies( cookies -> cookies.add(  "XSRF-TOKEN", csrf) );
        request.header("Authorization", spring);
        return request;
    }

//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {

        return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
    }

但是当请求绑定认证时,日志如下:

2021-10-26 23:57:18.760  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | request(unbounded)
2021-10-26 23:57:18.761  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onNext(true)
2021-10-26 23:57:18.762  INFO 6860 --- [ctor-http-nio-4] reactor.Mono.Just.4                      : | onComplete()

据我所知,数据流以订阅和后续请求开始(我认为 returns 来自 accessPointService.getAccessPointAuthorizationRequirement 方法 Mono 值的 TRUE,如果我错了请纠正我),但 'onComplete()' 日志出现了。我不知道 onComplete() 日志的确切含义,因为它是在执行 getRequestExchange 方法(被调用)之前显示的。 Mono.just(s.equals("")) 段代码永远不会执行。

我已经阅读了很多关于 'nothing happens until you subscribe' 的内容,但我仍然不知道如果我从未明确订阅流,为什么会调用反应流,而且我也不知道如何订阅实施它,因为它只有 returns 一个 Disposable(我想我可以从内部抛出异常?)。另外,我听说在调用多个订阅者时会解耦,所以我尽量避免它们。

任何有关反应式编程、reactor-core 或特定流程以及如何改进它的帮助,我们都将不胜感激。

干杯。

所以经过一些研究并感谢@Toerktumlare 的评论,并弄清楚了发生了什么以及我 changed/applied 对此做了什么。

所以对于'onComplete()'日志,它标志着数据生产者的结束。因此,要查看操作的完整堆栈,我需要将每个生产者与自己的日志链接起来。例如:

Mono.just(Boolean.FALSE)
    .log()
    .flatMap(booleanVal -> Mono.just(booleanVal.toString()))
    .log()
    .subscribe(stringVal -> System.out.println("This is the boolean value " + stringVal));

这将为初始生产者和 flatMap 操作生成跟踪。

现在,进入主要问题,问题出在 getRequestExchange 方法中:

//gets the body as string.
private static Mono<String> getRequestExchange(WebClient.RequestHeadersSpec<?> securedReq) {

        return securedReq.exchangeToMono(clientResponse -> clientResponse.bodyToMono(String.class));
    }

问题隐藏在 bodyToMono 方法中。根据此站点 https://medium.com/@jeevjyotsinghchhabda/dont-let-webclient-s-bodytomono-trick-you-645123b3e0a9 ,如果对此请求的响应出于某种原因没有正文,则不会抛出任何错误,而只是 return 一个 Mono.empty()。由于流程没有为这样的制作人准备,所以就到此为止了。

就我而言,问题是 spring 云安全。我在请求中提供了授权凭证,但没有提供关联的 SESSION cookie。所以请求 returned 了一个没有正文的 302 (Found)。这就是问题所在(不是反应流本身)。

所以,在那之后,我修改了请求,@Toerktumlare 的评论帮助我开发了一个可行的解决方案:

//service that returns if certain resource needs authentication or not, or if it's not even configured
return accessPointService.getAccessPointAuthorizationRequirement(serverWebExchange.getRequest().getMethod().toString().toUpperCase(), FWKUtils.translateAccessPointPath(serverWebExchange.getRequest().getPath().pathWithinApplication().elements()))
                //if the response is a Mono Empty, then returns a not acceptable exception
                .switchIfEmpty(Mono.defer(() -> throwNotAcceptable(serverWebExchange)))
                //takes the boolean value to check if extra auth is needed.
                .flatMap(isRequired -> validateAuthenticationRequirement(isRequired))
                //gets the access token - the extra auth credential
                .flatMap(isRequired -> getHeaderToken(serverWebExchange))
                //from this access generates a WebClient to the specific authentication service - from a webClientProvider to not create too many WebClients.
                .flatMap(accessToken -> generateAuthenticationRequest(webClientProvider.getInstance(), accessToken))
                //gets the CRSF token credential and secures the request (adds it to the header and the cookies)
                .zipWith(getCredential(webClientProvider.getInstance(), "csrf"), (securedRequest, csrfToken) -> secureAuthenticationRequest(securedRequest, csrfToken, "X-XSRF-TOKEN", "XSRF-TOKEN"))
                //gets the SESSION (spring cloud security) token credential and secures the request (adds it to the header and the cookies)
                .zipWith(getCredential(webClientProvider.getInstance(), "spring-cloud"), (securedRequest, sessionToken) -> secureAuthenticationRequest(securedRequest, sessionToken, "Authorization", "SESSION"))
                //does the request and gets the response
                .map(requestBodySpecs -> requestBodySpecs.retrieve())
                //from the response, maps it to a specific DTO. The single() clause is to validate that a body is present.
                .flatMap(clientResponse -> clientResponse.bodyToMono(SecurityCredentialResponseDTO.class).single())
                //checks the authentication and throws a Unauthorizedstatus if its not valid.
                .flatMap(responseDTO -> checkTokenAuthentication(serverWebExchange, responseDTO))
                //if an error is present, then throws it 
                .onErrorResume(e -> {
                    if (e instanceof FWKException.GenericException) {
                        throw (FWKException.GenericException) e;
                    }
                    throw (RuntimeException) e;
                })
                //finally, continues the execution if no exception was thrown.
                .then(webFilterChain.filter(serverWebExchange));

我在此解决方案中实现了更多内容(存储 CSRF 和 spring-cloud 凭证以避免不必要的调用)。