将 headers 从 Kafka ConsumerRecord 传递到 Kafka ProducerRecord / RestTemplate / Feign
Passing headers From Kafka ConsumerRecord to Kafka ProducerRecord / RestTemplate / Feign
我们计划使用自定义 headers 进行测试,这应该在我们的微服务中传递。我们的应用程序使用 Java/Kotlin 和 Spring Boot.
多亏了 RequestContextHolder 我们可以轻松地从 HTTP-requests (Servlets) 获取 headers 并将它们传递到我们想要的任何地方。例如,我们将它们添加到每个 Feign-request:
class CustomRequestInterceptor : RequestInterceptor {
override fun apply(template: RequestTemplate) {
val requestAttributes = RequestContextHolder.getRequestAttributes() as ServletRequestAttributes?
requestAttributes?.request?.let { request ->
request.headerNames.toList()
.forEach { name -> template.header(name, request.getHeader(name)) }
}
}
}
对于 Kafka Producers,有 ProducerInterceptor。
但是有没有办法捕获 ConsumerRecord 的 Headers 并进一步传递它们?
我们计划使用 ThreadLocal 变量(就像 RequestContextHolder 那样),但是我们没有找到在这个变量中设置和取消设置 Headers 的方法。
例如,我们可以在RecordInterceptor中传递Headers,但是当线程处理完当前记录时,我们无法清除我们的变量,所以下次我们使用这个线程我们可以读错headers(ThreadLocal变量不会被清除)
有什么interfaces/instruments可以帮助我们吗?提前致谢!
2.7 版向拦截器添加了 success
和 failure
方法。
即将到来的2.8版本增加了afterRecord
(在ErrorHandler
之后调用)。
另一种选择是向容器的通知链添加 MethodInterceptor
。
我们计划使用自定义 headers 进行测试,这应该在我们的微服务中传递。我们的应用程序使用 Java/Kotlin 和 Spring Boot.
多亏了 RequestContextHolder 我们可以轻松地从 HTTP-requests (Servlets) 获取 headers 并将它们传递到我们想要的任何地方。例如,我们将它们添加到每个 Feign-request:
class CustomRequestInterceptor : RequestInterceptor {
override fun apply(template: RequestTemplate) {
val requestAttributes = RequestContextHolder.getRequestAttributes() as ServletRequestAttributes?
requestAttributes?.request?.let { request ->
request.headerNames.toList()
.forEach { name -> template.header(name, request.getHeader(name)) }
}
}
}
对于 Kafka Producers,有 ProducerInterceptor。
但是有没有办法捕获 ConsumerRecord 的 Headers 并进一步传递它们?
我们计划使用 ThreadLocal 变量(就像 RequestContextHolder 那样),但是我们没有找到在这个变量中设置和取消设置 Headers 的方法。
例如,我们可以在RecordInterceptor中传递Headers,但是当线程处理完当前记录时,我们无法清除我们的变量,所以下次我们使用这个线程我们可以读错headers(ThreadLocal变量不会被清除)
有什么interfaces/instruments可以帮助我们吗?提前致谢!
2.7 版向拦截器添加了 success
和 failure
方法。
即将到来的2.8版本增加了afterRecord
(在ErrorHandler
之后调用)。
另一种选择是向容器的通知链添加 MethodInterceptor
。