@Header 和 spring 流函数式编程模型

@Header and spring stream functional programming model

有没有办法在下面的kafka消费者代码中使用@Header?我正在使用 Spring Cloud Stream(Kafka Stream 活页夹实现),并且在我的实现之后使用 功能模型 例如。

@Bean
public Consumer<KStream<String, Pojo>> process() {
    return messages -> messages.foreach((k, v) -> process(v));
}

如果将 Spring 用于 apache kafka 那么这可以像

一样简单
@KafkaListener(topics = "${mytopicname}", clientIdPrefix = "${myprefix}", errorHandler = "customEventErrorHandler")
public void processEvent(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                         @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
                         @Valid Pojo pojo) {
    ...
    // use headers here
    ...
}

否; Kafka Streams 活页夹不基于 Spring 消息传递。

您可以在 Transformer(通过 ProcessorContext)添加到您的信息流中访问 headers、主题等。

您可以将 Kafka 消息通道绑定器与

一起使用
@Bean
public Consumer<Message<Pojo>> process() {
    return message -> ...
}