如何编写自定义 GlobalFilter 以检查 Spring 云网关中的请求 body?

How to write custom GlobalFilter for checking request body in Spring Cloud Gateway?

我想在 GlobalFilter 中验证 body。

我需要读取两个包含 body 校验和的 http headers 并将其与 body 本身进行比较:

internal class MyFilter : GlobalFilter {

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain) =
        ByteArrayDecoder()
            .decodeToMono(
                exchange.request.body,
                ResolvableType.forClass(ByteBuffer::class.java),
                exchange.request.headers.contentType,
                null
            )
            .flatMap { /* my logic checking body against request headers */ chain.filter(exchange) }
}

问题是 decodingToMono 卡住并且不转发请求。

如何正确解码 body?

我已经成功编写了一个在阅读正文后不会卡住的过滤器:

interface BodyFilter {
    fun filter(
        body: Mono<ByteArrayResource>,
        exchange: ServerWebExchange,
        passRequestFunction: () -> Mono<Void>
    ): Mono<Void>
}

class HeaderAndBodyGlobalFilter(private val bodyFilter: BodyFilter) : GlobalFilter {

    private val messageReaders: List<HttpMessageReader<*>> = HandlerStrategies.withDefaults().messageReaders()

    override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono<Void> {
        val serverRequest: ServerRequest = ServerRequest.create(exchange, messageReaders)
        val body: Mono<ByteArrayResource> = serverRequest.bodyToMono<ByteArrayResource>(ByteArrayResource::class.java)
        return bodyFilter.filter(body, exchange) { reconstructRequest(body, exchange, chain) }
    }

    private fun reconstructRequest(
        body: Mono<ByteArrayResource>,
        exchange: ServerWebExchange,
        chain: GatewayFilterChain
    ): Mono<Void> {
        val headers: HttpHeaders = writableHttpHeaders(exchange.request.headers)
        val outputMessage = CachedBodyOutputMessage(exchange, headers)

        return BodyInserters.fromPublisher(
            body,
            ByteArrayResource::class.java
        ).insert(outputMessage, BodyInserterContext())
            .then(Mono.defer {
                val decorator: ServerHttpRequestDecorator = decorate(
                    exchange, headers, outputMessage
                )
                chain
                    .filter(exchange.mutate().request(decorator).build())
            })
    }

    private fun decorate(
        exchange: ServerWebExchange,
        headers: HttpHeaders,
        outputMessage: CachedBodyOutputMessage
    ): ServerHttpRequestDecorator {
        return object : ServerHttpRequestDecorator(exchange.request) {
            override fun getHeaders(): HttpHeaders {
                val contentLength = headers.contentLength
                val httpHeaders = HttpHeaders()
                httpHeaders.putAll(super.getHeaders())
                if (contentLength > 0) {
                    httpHeaders.contentLength = contentLength
                } else {
                    // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
                    // httpbin.org
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked")
                }
                return httpHeaders
            }

            override fun getBody(): Flux<DataBuffer> {
                return outputMessage.body
            }
        }
    }
}

然后执行 BodyFilter 失败时 returns Mono.empty() 或成功时调用 passRequestFunction