使用 WebClient 将在 Spring Webflux 应用程序中收到的请求 headers 传播到下游服务
Using WebClient to propagate request headers received in a Spring Webflux applications to downstream services
我有两种 Webflux 应用程序,annotation-based 和 route-based。这些应用程序使用一组 headers 调用,我需要使用 WebClient
.
在下游调用中传播其中一些(开放式跟踪)
如果这些是正常的 Spring WebMvc 应用程序,我会使用过滤器将选定的 headers 保留在 ThreadLocal
中,在 RestTemplate
拦截器中访问它以发送他们到后续服务并清除 ThreadLocal
.
在 WebFlux 应用程序中复制此行为的正确方法是什么?
我使用 Project Reactor 的上下文解决了这个问题,将 headers 存储在 WebFilter 中。然后它们在 WebClient 的 ExchangeFilterFunction 中得到。这是完整的解决方案:
WebFilter
class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return chain.filter(exchange)
.subscriberContext { ctx ->
var updatedContext = ctx
exchange.request.headers.forEach {
if (openTracingHeaders.contains(it.key.toLowerCase())) {
logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
updatedContext = updatedContext.put(it.key, it.value[0])
}
}
updatedContext
}
}
}
OpenTracingExchangeFilterFunction
class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
logger.debug("OpenTracingExchangeFilterFunction - filter()")
return OpenTracingClientResponseMono(request, next, headers)
}
}
OpenTracingClientResponseMono
class OpenTracingClientResponseMono(private val request: ClientRequest,
private val next: ExchangeFunction,
private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {
private val logger = LoggerFactory.getLogger(javaClass)
override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
val context = subscriber.currentContext()
val requestBuilder = ClientRequest.from(request)
requestBuilder.headers { httpHeaders ->
headersToPropagate.forEach {
if(context.hasKey(it)) {
logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
httpHeaders[it] = context.get<String>(it)
}
}
}
val mutatedRequest = requestBuilder.build()
next.exchange(mutatedRequest).subscribe(subscriber)
}
}
OpenTracingConfiguration
@Configuration
class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {
@Bean
fun webClient(): WebClient {
return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
}
@Bean
fun openTracingFilter(): WebFilter {
return OpenTracingFilter(openTracingConfigurationProperties.headers)
}
@Bean
fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
}
}
OpenTracingConfigurationProperties
@Configuration
@ConfigurationProperties("opentracing")
class OpenTracingConfigurationProperties {
lateinit var headers: Set<String>
}
application.yml
opentracing:
headers:
- x-request-id
- x-b3-traceid
- x-b3-spanid
- x-b3-parentspanid
- x-b3-sampled
- x-b3-flags
- x-ot-span-context
我需要将 x-request-id header 传递到我的应用程序中的下游服务。通过添加将 x-request-id 写入反应器上下文
的 WebFilter
来实现此目的
class ContextWebFilter : WebFilter {
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
val headers = exchange.request.headers
val xRequestId = headers[X_REQUEST_ID]?.firstOrNull() ?: ""
val requestId = xRequestId.ifBlank { UUID.randomUUID().toString() }
return chain
.filter(exchange)
.contextWrite { it.put(X_REQUEST_ID, requestId) }
}
companion object {
const val X_REQUEST_ID = "X-REQUEST-ID"
}
}
并使用 ExchangeFilterFunction
更新 WebClient
以更新传出请求
WebClient.builder()
.filter(
ExchangeFilterFunction.ofRequestProcessor { request ->
Mono.deferContextual { context ->
val xRId = context.getOrDefault<String>("X-REQUEST-ID", "")
logger.debug("Set X-REQUEST-ID={} as a header to outgoing call", xRId)
Mono.just(
ClientRequest.from(request)
.header("X-REQUEST-ID", xRId)
.build()
)
}
}
)
.baseUrl("http://localhost:8080")
.build()
我有两种 Webflux 应用程序,annotation-based 和 route-based。这些应用程序使用一组 headers 调用,我需要使用 WebClient
.
如果这些是正常的 Spring WebMvc 应用程序,我会使用过滤器将选定的 headers 保留在 ThreadLocal
中,在 RestTemplate
拦截器中访问它以发送他们到后续服务并清除 ThreadLocal
.
在 WebFlux 应用程序中复制此行为的正确方法是什么?
我使用 Project Reactor 的上下文解决了这个问题,将 headers 存储在 WebFilter 中。然后它们在 WebClient 的 ExchangeFilterFunction 中得到。这是完整的解决方案:
WebFilter
class OpenTracingFilter(private val openTracingHeaders: Set<String>) : WebFilter {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
return chain.filter(exchange)
.subscriberContext { ctx ->
var updatedContext = ctx
exchange.request.headers.forEach {
if (openTracingHeaders.contains(it.key.toLowerCase())) {
logger.debug("Found OpenTracing Header - key {} - value {}", it.key, it.value[0])
updatedContext = updatedContext.put(it.key, it.value[0])
}
}
updatedContext
}
}
}
OpenTracingExchangeFilterFunction
class OpenTracingExchangeFilterFunction(private val headers: Set<String>) : ExchangeFilterFunction {
private val logger = LoggerFactory.getLogger(javaClass)
override fun filter(request: ClientRequest, next: ExchangeFunction): Mono<ClientResponse> {
logger.debug("OpenTracingExchangeFilterFunction - filter()")
return OpenTracingClientResponseMono(request, next, headers)
}
}
OpenTracingClientResponseMono
class OpenTracingClientResponseMono(private val request: ClientRequest,
private val next: ExchangeFunction,
private val headersToPropagate: Set<String>) : Mono<ClientResponse>() {
private val logger = LoggerFactory.getLogger(javaClass)
override fun subscribe(subscriber: CoreSubscriber<in ClientResponse>) {
val context = subscriber.currentContext()
val requestBuilder = ClientRequest.from(request)
requestBuilder.headers { httpHeaders ->
headersToPropagate.forEach {
if(context.hasKey(it)) {
logger.debug("Propagating header key {} - value{}", it, context.get<String>(it))
httpHeaders[it] = context.get<String>(it)
}
}
}
val mutatedRequest = requestBuilder.build()
next.exchange(mutatedRequest).subscribe(subscriber)
}
}
OpenTracingConfiguration
@Configuration
class OpenTracingConfiguration(private val openTracingConfigurationProperties: OpenTracingConfigurationProperties) {
@Bean
fun webClient(): WebClient {
return WebClient.builder().filter(openTracingExchangeFilterFunction()).build()
}
@Bean
fun openTracingFilter(): WebFilter {
return OpenTracingFilter(openTracingConfigurationProperties.headers)
}
@Bean
fun openTracingExchangeFilterFunction(): OpenTracingExchangeFilterFunction {
return OpenTracingExchangeFilterFunction(openTracingConfigurationProperties.headers)
}
}
OpenTracingConfigurationProperties
@Configuration
@ConfigurationProperties("opentracing")
class OpenTracingConfigurationProperties {
lateinit var headers: Set<String>
}
application.yml
opentracing:
headers:
- x-request-id
- x-b3-traceid
- x-b3-spanid
- x-b3-parentspanid
- x-b3-sampled
- x-b3-flags
- x-ot-span-context
我需要将 x-request-id header 传递到我的应用程序中的下游服务。通过添加将 x-request-id 写入反应器上下文
的WebFilter
来实现此目的
class ContextWebFilter : WebFilter {
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
val headers = exchange.request.headers
val xRequestId = headers[X_REQUEST_ID]?.firstOrNull() ?: ""
val requestId = xRequestId.ifBlank { UUID.randomUUID().toString() }
return chain
.filter(exchange)
.contextWrite { it.put(X_REQUEST_ID, requestId) }
}
companion object {
const val X_REQUEST_ID = "X-REQUEST-ID"
}
}
并使用 ExchangeFilterFunction
更新 WebClient
以更新传出请求
WebClient.builder()
.filter(
ExchangeFilterFunction.ofRequestProcessor { request ->
Mono.deferContextual { context ->
val xRId = context.getOrDefault<String>("X-REQUEST-ID", "")
logger.debug("Set X-REQUEST-ID={} as a header to outgoing call", xRId)
Mono.just(
ClientRequest.from(request)
.header("X-REQUEST-ID", xRId)
.build()
)
}
}
)
.baseUrl("http://localhost:8080")
.build()