您如何使 WebFilter 处理尚未被 GlobalFilter 处理的请求?

How do you make WebFilter process requests that are not already handled by GlobalFilter?

GlobalFilter vs WebFilter 相似。我想做一些 header 操作和日志记录,但如果网关已通过 GlobalFilter 处理某些内容,则响应已设置。

主要是我只想为由我在代码中的请求处理程序而不是网关本身处理的调用放置 webfilter。

我写的GlobalFilter

@Component
@RequiredArgsConstructor
@Slf4j(topic = "request")
public class PerformanceRequestIDPostGatewayFilter implements GlobalFilter {

  private final Tracing tracing;

  private static final String LOG_MESSAGE_FORMAT = "{} {} {} {}ms";

  @Override
  public Mono<Void> filter(final ServerWebExchange exchange, GatewayFilterChain chain) {

    final long startNanos = System.nanoTime();
    return chain
        .filter(exchange)
        .doOnEach(
            WebFluxSleuthOperators.withSpanInScope(
                () -> {
                  final String traceId = tracing.currentTraceContext().get().traceIdString();
                  exchange.getResponse().getHeaders().add("X-B3-Traceid", traceId);
                  exchange.getResponse().getHeaders().add("X-Trace-ID", Util.toXRay(traceId));
                }))
        .then(
            Mono.fromRunnable(
                () -> {
                  final String requestURI = exchange.getRequest().getURI().toASCIIString();
                  final String method = exchange.getRequest().getMethodValue();

                  final long requestTimeNano = System.nanoTime() - startNanos;
                  final double requestTimeInMillis = requestTimeNano * 0.000001;
                  final HttpStatus statusCode =
                      Objects.requireNonNull(exchange.getResponse().getStatusCode());
                  final int status = statusCode.value();
                  final String requestTimeInMillisText =
                      String.format("%.03f", requestTimeInMillis);
                  if (requestTimeInMillis > 5000) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.is4xxClientError() || requestTimeInMillis > 3000) {
                    log.warn(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.isError()) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else {
                    log.info(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  }
                  // add CORS
                  final HttpHeaders responseHeaders =
                  exchange.getResponse().getHeaders();
                  if (responseHeaders.getAccessControlAllowOrigin() == null) {
                  responseHeaders.setAccessControlAllowOrigin("*");
                  }
                }));
  }
}

WebFilter 中,我想实现同样的事情,请注意,唯一的变化是删除了 @Component,因为如果我使用它会破坏应用程序,而 implements WebFilter和使用 WebFilterChain chain

@Slf4j(topic = "request")
@RequiredArgsConstructor
public class PerformanceRequestIDPostFilter implements WebFilter {

  private final Tracing tracing;

  private static final String LOG_MESSAGE_FORMAT = "{} {} {} {}ms";

  @Override
  public Mono<Void> filter(final ServerWebExchange exchange, WebFilterChain chain) {

    final long startNanos = System.nanoTime();
    return chain
        .filter(exchange)
        .doOnEach(
            WebFluxSleuthOperators.withSpanInScope(
                () -> {
                  final String traceId = tracing.currentTraceContext().get().traceIdString();
                  exchange.getResponse().getHeaders().add("X-B3-Traceid", traceId);
                  exchange.getResponse().getHeaders().add("X-Trace-ID", Util.toXRay(traceId));
                }))
        .then(
            Mono.fromRunnable(
                () -> {
                  final String requestURI = exchange.getRequest().getURI().toASCIIString();
                  final String method = exchange.getRequest().getMethodValue();

                  final long requestTimeNano = System.nanoTime() - startNanos;
                  final double requestTimeInMillis = requestTimeNano * 0.000001;
                  final HttpStatus statusCode =
                      Objects.requireNonNull(exchange.getResponse().getStatusCode());
                  final int status = statusCode.value();
                  final String requestTimeInMillisText =
                      String.format("%.03f", requestTimeInMillis);
                  if (requestTimeInMillis > 5000) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.is4xxClientError() || requestTimeInMillis > 3000) {
                    log.warn(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.isError()) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else {
                    log.info(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  }
                  // add CORS
                  final HttpHeaders responseHeaders = exchange.getResponse().getHeaders();
                  if (responseHeaders.getAccessControlAllowOrigin() == null) {
                    responseHeaders.setAccessControlAllowOrigin("*");
                  }
                }));
  }
}

我尝试在 WebFilter 中放入 ServerWebExchangeUtils.isAlreadyRouted,但效果不佳。

我最终让我的用例工作,当我在 GatewayFilter 上时,除非我这样做,否则我无法获得跟踪上下文

    .doOnEach(
            WebFluxSleuthOperators.withSpanInScope(
                () -> {
                  final String traceId = tracing.currentTraceContext().get().traceIdString();
                  exchange.getResponse().getHeaders().add("X-B3-Traceid", traceId);
                  exchange.getResponse().getHeaders().add("X-Trace-ID", Util.toXRay(traceId));
                }))

但是如果我把它放在网络过滤器上,那只会在发送消息后发生。然而,WebFilter 实际上在他们的“预”过滤阶段就有了它。所以我所做的就是完全删除 GatewayFilter(这是我在了解 WebFilter order which isn't documented 之前最初拥有的)。

我做的实验是...

    public Mono<Void> filter(final ServerWebExchange exchange, WebFilterChain chain) {
        log.info("WebFilter pre {}", exchange.getRequest().getURI());
        return chain
                .filter(exchange)
                .then(
                        Mono.fromRunnable(()->log.info("WebFilter post {}", exchange.getRequest().getURI()));

    }

    public Mono<Void> filter(final ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info("WebFilter pre {}", exchange.getRequest().getURI());
        return chain
                .filter(exchange)
                .then(
                        Mono.fromRunnable(()->log.info("WebFilter post {}", exchange.getRequest().getURI()));

    }

日志结果证实了评论中讨论的内容以及一些启示:

... traceID ... WebFilter pre /blah/foo
... traceID ... GatewayFilter pre /foo
... traceID ... GatewayFilter post /foo
... traceID ... WebFilter post /blah/foo
  1. WebFilter 获取最初请求的完整 URI,但网关过滤器仅获取从 Path 谓词中提取的部分
  2. 跟踪 ID 在 WebFilter pre 中可用。这解决了我之前执行 GatewayFilter pre-filter 时跟踪 ID 不可用的难题。

最终结果是单个 WebFilter。

@Component
@Slf4j(topic = "request")
@RequiredArgsConstructor
public class PerformanceRequestIDPostFilter implements WebFilter {

  private final Tracing tracing;
  private final ExcludedPathPatterns excludedPathPatterns;

  private static final String LOG_MESSAGE_FORMAT = "{} {} {} {}ms";

  @Override
  public Mono<Void> filter(final ServerWebExchange exchange, WebFilterChain chain) {

    final long startNanos = System.nanoTime();
    final String traceId = tracing.currentTraceContext().get().traceIdString();
    exchange.getResponse().getHeaders().add("X-B3-Traceid", traceId);
    exchange.getResponse().getHeaders().add("X-Trace-ID", Util.toXRay(traceId));
    return chain
        .filter(exchange)
        .then(
            Mono.fromRunnable(
                () -> {
                    if (excludedPathPatterns.isExcludedForServer(exchange.getRequest().getPath().pathWithinApplication())) {
                        return;
                    }
                  final String requestURI = exchange.getRequest().getURI().toASCIIString();
                  final String method = exchange.getRequest().getMethodValue();

                  final long requestTimeNano = System.nanoTime() - startNanos;
                  final double requestTimeInMillis = requestTimeNano * 0.000001;
                  final HttpStatus statusCode =
                      Objects.requireNonNull(exchange.getResponse().getStatusCode());
                  final int status = statusCode.value();
                  final String requestTimeInMillisText =
                      String.format("%.03f", requestTimeInMillis);
                  if (requestTimeInMillis > 5000) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.is4xxClientError() || requestTimeInMillis > 3000) {
                    log.warn(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else if (statusCode.isError()) {
                    log.error(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  } else {
                    log.info(
                        LOG_MESSAGE_FORMAT, method, requestURI, status, requestTimeInMillisText);
                  }
                }));
  }
}