如何 retrieve/set header 在 spring-cloud-stream-binder-kafka-streams:3.1.1 中使用函数式方法

How to retrieve/set header using functional approach in spring-cloud-stream-binder-kafka-streams:3.1.1

我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1 函数式编程。如何检索处理器函数

中的所有 headers

Java代码

@SpringBootApplication
public class KafkaMessageApplication {
    public static void main(String args[]) {
        SpringApplication.run(KafkaMessageApplication.class, args);
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> process() {
        // TODO investigate headers on the incoming message
        // For example, find partition key on which message was received and publish to same partition key on destination topic
        return input -> input;
    }
}

为了像那样访问headers,您需要在Kafka Streams 中使用low-level processor/transformer API。您可以混合使用 low-level 处理器 API 和 DSL,同时仍将其用作 Spring Cloud Stream 应用程序。有关详细信息,请参阅 this。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是一个终端API,不允许您继续。另一方面,当使用转换器时,您可以在检查 headers 之后将其作为 KStream 继续使用。例如,这里有一个想法:

input -> input
                    .transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
                        @Override
                        public Transformer<String, String, KeyValue<String, String>> get() {
                            return new Transformer<Object, String, KeyValue<Object, String>>() {
                                ProcessorContext context;
                                @Override
                                public void init(ProcessorContext context) {
                                    this.context = context;
                                }

                                @Override
                                public KeyValue<Object, String> transform(Object key, String value) {

// Here you can access the headers using this.context.headers()
                                    return new KeyValue<>(key, value);
                                }

                                @Override
                                public void close() {

                                }
                            };
                        }
                    })
                    .map(...)
                    .groupBy(...)
                    ...

查看transform方法中的注释。在那里,您可以访问每个传入记录的 headers。

通过查看您的问题,我发现您正在尝试获取传入记录的分区 ID。为此,您可以直接调用 context.partition()。我认为您不需要为此访问 headers。

这是一个关于访问 headers 的 SO 线程。