将 headers 从 Kafka ConsumerRecord 传递到 Kafka ProducerRecord / RestTemplate / Feign

Passing headers From Kafka ConsumerRecord to Kafka ProducerRecord / RestTemplate / Feign

我们计划使用自定义 headers 进行测试,这应该在我们的微服务中传递。我们的应用程序使用 Java/Kotlin 和 Spring Boot.

多亏了 RequestContextHolder 我们可以轻松地从 HTTP-requests (Servlets) 获取 headers 并将它们传递到我们想要的任何地方。例如,我们将它们添加到每个 Feign-request:

class CustomRequestInterceptor : RequestInterceptor {

    override fun apply(template: RequestTemplate) {

        val requestAttributes = RequestContextHolder.getRequestAttributes() as ServletRequestAttributes?

        requestAttributes?.request?.let { request ->
            request.headerNames.toList()
                .forEach { name -> template.header(name, request.getHeader(name)) }
        }
    }
}

对于 Kafka Producers,有 ProducerInterceptor

但是有没有办法捕获 ConsumerRecord 的 Headers 并进一步传递它们?

我们计划使用 ThreadLocal 变量(就像 RequestContextHolder 那样),但是我们没有找到在这个变量中设置和取消设置 Headers 的方法。

例如,我们可以在RecordInterceptor中传递Headers,但是当线程处理完当前记录时,我们无法清除我们的变量,所以下次我们使用这个线程我们可以读错headers(ThreadLocal变量不会被清除)

有什么interfaces/instruments可以帮助我们吗?提前致谢!

2.7 版向拦截器添加了 successfailure 方法。

即将到来的2.8版本增加了afterRecord(在ErrorHandler之后调用)。

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java#L50-L93

另一种选择是向容器的通知链添加 MethodInterceptor

https://github.com/spring-projects/spring-kafka/blob/aec0c072ec99ecfaffe89265fc128cc523963211/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java#L847-L852