@Header 和 spring 流函数式编程模型
@Header and spring stream functional programming model
有没有办法在下面的kafka消费者代码中使用@Header?我正在使用 Spring Cloud Stream(Kafka Stream 活页夹实现),并且在我的实现之后使用 功能模型 例如。
@Bean
public Consumer<KStream<String, Pojo>> process() {
return messages -> messages.foreach((k, v) -> process(v));
}
如果将 Spring 用于 apache kafka 那么这可以像
一样简单
@KafkaListener(topics = "${mytopicname}", clientIdPrefix = "${myprefix}", errorHandler = "customEventErrorHandler")
public void processEvent(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
@Valid Pojo pojo) {
...
// use headers here
...
}
否; Kafka Streams 活页夹不基于 Spring 消息传递。
您可以在 Transformer
(通过 ProcessorContext
)添加到您的信息流中访问 headers、主题等。
您可以将 Kafka 消息通道绑定器与
一起使用
@Bean
public Consumer<Message<Pojo>> process() {
return message -> ...
}
有没有办法在下面的kafka消费者代码中使用@Header?我正在使用 Spring Cloud Stream(Kafka Stream 活页夹实现),并且在我的实现之后使用 功能模型 例如。
@Bean
public Consumer<KStream<String, Pojo>> process() {
return messages -> messages.foreach((k, v) -> process(v));
}
如果将 Spring 用于 apache kafka 那么这可以像
一样简单@KafkaListener(topics = "${mytopicname}", clientIdPrefix = "${myprefix}", errorHandler = "customEventErrorHandler")
public void processEvent(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
@Valid Pojo pojo) {
...
// use headers here
...
}
否; Kafka Streams 活页夹不基于 Spring 消息传递。
您可以在 Transformer
(通过 ProcessorContext
)添加到您的信息流中访问 headers、主题等。
您可以将 Kafka 消息通道绑定器与
一起使用@Bean
public Consumer<Message<Pojo>> process() {
return message -> ...
}