如何 retrieve/set header 在 spring-cloud-stream-binder-kafka-streams:3.1.1 中使用函数式方法
How to retrieve/set header using functional approach in spring-cloud-stream-binder-kafka-streams:3.1.1
我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1
函数式编程。如何检索处理器函数
中的所有 headers
Java代码
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
// TODO investigate headers on the incoming message
// For example, find partition key on which message was received and publish to same partition key on destination topic
return input -> input;
}
}
为了像那样访问headers,您需要在Kafka Streams 中使用low-level processor/transformer API。您可以混合使用 low-level 处理器 API 和 DSL,同时仍将其用作 Spring Cloud Stream 应用程序。有关详细信息,请参阅 this。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是一个终端API,不允许您继续。另一方面,当使用转换器时,您可以在检查 headers 之后将其作为 KStream
继续使用。例如,这里有一个想法:
input -> input
.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<Object, String, KeyValue<Object, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// Here you can access the headers using this.context.headers()
return new KeyValue<>(key, value);
}
@Override
public void close() {
}
};
}
})
.map(...)
.groupBy(...)
...
查看transform
方法中的注释。在那里,您可以访问每个传入记录的 headers。
通过查看您的问题,我发现您正在尝试获取传入记录的分区 ID。为此,您可以直接调用 context.partition()。我认为您不需要为此访问 headers。
这是一个关于访问 headers 的 SO 线程。
我正在使用 spring-cloud-stream-binder-kafka-streams:3.1.1
函数式编程。如何检索处理器函数
Java代码
@SpringBootApplication
public class KafkaMessageApplication {
public static void main(String args[]) {
SpringApplication.run(KafkaMessageApplication.class, args);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
// TODO investigate headers on the incoming message
// For example, find partition key on which message was received and publish to same partition key on destination topic
return input -> input;
}
}
为了像那样访问headers,您需要在Kafka Streams 中使用low-level processor/transformer API。您可以混合使用 low-level 处理器 API 和 DSL,同时仍将其用作 Spring Cloud Stream 应用程序。有关详细信息,请参阅 this。基本上,您需要在消费者的情况下使用处理器,在函数的情况下使用转换器。处理器是一个终端API,不允许您继续。另一方面,当使用转换器时,您可以在检查 headers 之后将其作为 KStream
继续使用。例如,这里有一个想法:
input -> input
.transform(new TransformerSupplier<String, String, KeyValue<String, String>>() {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<Object, String, KeyValue<Object, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// Here you can access the headers using this.context.headers()
return new KeyValue<>(key, value);
}
@Override
public void close() {
}
};
}
})
.map(...)
.groupBy(...)
...
查看transform
方法中的注释。在那里,您可以访问每个传入记录的 headers。
通过查看您的问题,我发现您正在尝试获取传入记录的分区 ID。为此,您可以直接调用 context.partition()。我认为您不需要为此访问 headers。
这是一个关于访问 headers 的 SO 线程。