在 Spring Boot 中流式响应下游客户端
Stream response to downstream clients in SpringBoot
我有一个控制器代理 api 端点,它接收针对不同服务的不同请求负载。该控制器验证有效负载并根据特定规则添加少量 headers。在当前上下文中,我不想解析从上游服务收到的响应。代理方法应该简单地将响应流式传输到下游客户端,以便它可以很好地扩展而不会在处理大型响应负载时遇到任何内存问题。
我已经实现了这样的方法:
suspend fun proxyRequest(
url: String,
request: ServerHttpRequest,
customHeaders: HttpHeaders = HttpHeaders.EMPTY,
): ResponseEntity<String>? {
val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)
val uri = URI.create(url)
val webClient = proxyClient.method(request.method!!)
.uri(uri)
.body(request.body)
modifiedReqHeaders.forEach {
val list = it.value.iterator().asSequence().toList()
val ar: Array<String> = list.toTypedArray()
@Suppress("SpreadOperator")
webClient.header(it.key, *ar)
}
return webClient.exchangeToMono { res ->
res.bodyToMono(String::class.java).map { b -> ResponseEntity.status(res.statusCode()).body(b) }
}.awaitFirstOrNull()
}
但这似乎不是流式传输。当我尝试下载大文件时,它抱怨无法保存大数据缓冲区。
有人可以帮助我编写反应式流式方法吗?
这就是我最后做的。
suspend fun proxyRequest(
url: String,
request: ServerHttpRequest,
response: ServerHttpResponse,
customHeaders: HttpHeaders = HttpHeaders.EMPTY,
): Void? {
val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)
val uri = URI.create(url)
val webClient = proxyClient.method(request.method!!)
.uri(uri)
.body(request.body)
modifiedReqHeaders.forEach {
val list = it.value.iterator().asSequence().toList()
val ar: Array<String> = list.toTypedArray()
@Suppress("SpreadOperator")
webClient.header(it.key, *ar)
}
val respEntity = webClient
.retrieve()
.toEntityFlux<DataBuffer>()
.awaitSingle()
response.apply {
headers.putAll(respEntity.headers)
statusCode = respEntity.statusCode
}
return response.writeWith(respEntity.body ?: Flux.empty()).awaitFirstOrNull()
}
让我知道这是否真的在向下游发送数据并刷新?
您的第一个代码片段因内存问题而失败,因为它在内存中缓冲整个响应 body 作为 String
并在之后转发它。如果响应非常大,您可能会填满整个可用内存。
第二种方法也失败了,因为您没有返回整个 Flux<DataBuffer>
(因此整个响应作为缓冲区),您只返回第一个。这失败了,因为响应不完整。
即使您设法解决了这个特定问题,还有许多其他事项需要注意:
- 您似乎没有返回原始响应 headers,有效地改变了响应内容类型
- 你不应该转发所有传入的响应headers,因为其中一些确实取决于服务器(比如传输编码)
- security-related request/response headers 会怎样?
- 您如何处理跟踪和指标?
您可以看一下 Spring Cloud Gateway 项目,它处理了很多这些细微之处,让您可以操纵 requests/responses。
我有一个控制器代理 api 端点,它接收针对不同服务的不同请求负载。该控制器验证有效负载并根据特定规则添加少量 headers。在当前上下文中,我不想解析从上游服务收到的响应。代理方法应该简单地将响应流式传输到下游客户端,以便它可以很好地扩展而不会在处理大型响应负载时遇到任何内存问题。
我已经实现了这样的方法:
suspend fun proxyRequest(
url: String,
request: ServerHttpRequest,
customHeaders: HttpHeaders = HttpHeaders.EMPTY,
): ResponseEntity<String>? {
val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)
val uri = URI.create(url)
val webClient = proxyClient.method(request.method!!)
.uri(uri)
.body(request.body)
modifiedReqHeaders.forEach {
val list = it.value.iterator().asSequence().toList()
val ar: Array<String> = list.toTypedArray()
@Suppress("SpreadOperator")
webClient.header(it.key, *ar)
}
return webClient.exchangeToMono { res ->
res.bodyToMono(String::class.java).map { b -> ResponseEntity.status(res.statusCode()).body(b) }
}.awaitFirstOrNull()
}
但这似乎不是流式传输。当我尝试下载大文件时,它抱怨无法保存大数据缓冲区。 有人可以帮助我编写反应式流式方法吗?
这就是我最后做的。
suspend fun proxyRequest(
url: String,
request: ServerHttpRequest,
response: ServerHttpResponse,
customHeaders: HttpHeaders = HttpHeaders.EMPTY,
): Void? {
val modifiedReqHeaders = getHeadersWithoutOrigin(request, customHeaders)
val uri = URI.create(url)
val webClient = proxyClient.method(request.method!!)
.uri(uri)
.body(request.body)
modifiedReqHeaders.forEach {
val list = it.value.iterator().asSequence().toList()
val ar: Array<String> = list.toTypedArray()
@Suppress("SpreadOperator")
webClient.header(it.key, *ar)
}
val respEntity = webClient
.retrieve()
.toEntityFlux<DataBuffer>()
.awaitSingle()
response.apply {
headers.putAll(respEntity.headers)
statusCode = respEntity.statusCode
}
return response.writeWith(respEntity.body ?: Flux.empty()).awaitFirstOrNull()
}
让我知道这是否真的在向下游发送数据并刷新?
您的第一个代码片段因内存问题而失败,因为它在内存中缓冲整个响应 body 作为 String
并在之后转发它。如果响应非常大,您可能会填满整个可用内存。
第二种方法也失败了,因为您没有返回整个 Flux<DataBuffer>
(因此整个响应作为缓冲区),您只返回第一个。这失败了,因为响应不完整。
即使您设法解决了这个特定问题,还有许多其他事项需要注意:
- 您似乎没有返回原始响应 headers,有效地改变了响应内容类型
- 你不应该转发所有传入的响应headers,因为其中一些确实取决于服务器(比如传输编码)
- security-related request/response headers 会怎样?
- 您如何处理跟踪和指标?
您可以看一下 Spring Cloud Gateway 项目,它处理了很多这些细微之处,让您可以操纵 requests/responses。