使用 Spring Kafka 添加自定义 header
Adding custom header using Spring Kafka
我计划使用 Spring Kafka 客户端在 Spring Boot 应用程序中使用和生成来自 kafka 设置的消息。我在 Kafka 0.11 中看到了对自定义 header 的支持,详见 here。虽然它可用于本地 Kafka 生产者和消费者,但我在 Spring Kafka 中看不到对 adding/reading 自定义 header 的支持。
我正在尝试根据我希望存储在消息 header 中的重试计数为消息实现 DLQ,而无需解析负载。
好吧,Spring Kafka 从 2.0 版本开始提供 headers 支持:https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers
您可以拥有那个 KafkaHeaderMapper
实例并使用它来填充 headers 到 Message
,然后再通过 KafkaTemplate.send(Message<?> message)
发送它。或者你可以使用普通的 KafkaTemplate.send(ProducerRecord<K, V> record)
.
当您使用 KafkaMessageListenerContainer
接收记录时,可以通过注入 RecordMessagingMessageListenerAdapter
的 MessagingMessageConverter
提供 KafkaHeaderMapper
。
因此,任何自定义 headers 都可以通过任何一种方式进行转移。
当我偶然发现这个问题时,我正在寻找答案。但是我使用的是 ProducerRecord<?, ?>
class 而不是 Message<?>
,因此 header 映射器似乎不相关。
这是我添加自定义的方法 header:
var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);
现在阅读 headers(在使用之前),我添加了一个自定义拦截器。
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
Set<TopicPartition> partitions = records.partitions();
partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
return records;
}
private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
records.forEach(record -> {
var myHeaders = new ArrayList<Header>();
record.headers().headers("MyHeader").forEach(myHeaders::add);
log.info("My Headers: {}", myHeaders);
// Do with header as you see fit
});
}
@Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
最后一点是使用以下(Spring 引导)配置向 Kafka 消费者容器注册此拦截器:
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
public class MessagingConfiguration {
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}
我计划使用 Spring Kafka 客户端在 Spring Boot 应用程序中使用和生成来自 kafka 设置的消息。我在 Kafka 0.11 中看到了对自定义 header 的支持,详见 here。虽然它可用于本地 Kafka 生产者和消费者,但我在 Spring Kafka 中看不到对 adding/reading 自定义 header 的支持。
我正在尝试根据我希望存储在消息 header 中的重试计数为消息实现 DLQ,而无需解析负载。
好吧,Spring Kafka 从 2.0 版本开始提供 headers 支持:https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers
您可以拥有那个 KafkaHeaderMapper
实例并使用它来填充 headers 到 Message
,然后再通过 KafkaTemplate.send(Message<?> message)
发送它。或者你可以使用普通的 KafkaTemplate.send(ProducerRecord<K, V> record)
.
当您使用 KafkaMessageListenerContainer
接收记录时,可以通过注入 RecordMessagingMessageListenerAdapter
的 MessagingMessageConverter
提供 KafkaHeaderMapper
。
因此,任何自定义 headers 都可以通过任何一种方式进行转移。
当我偶然发现这个问题时,我正在寻找答案。但是我使用的是 ProducerRecord<?, ?>
class 而不是 Message<?>
,因此 header 映射器似乎不相关。
这是我添加自定义的方法 header:
var record = new ProducerRecord<String, String>(topicName, "Hello World");
record.headers().add("foo", "bar".getBytes());
kafkaTemplate.send(record);
现在阅读 headers(在使用之前),我添加了一个自定义拦截器。
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@Slf4j
public class MyConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> records) {
Set<TopicPartition> partitions = records.partitions();
partitions.forEach(partition -> interceptRecordsFromPartition(records.records(partition)));
return records;
}
private void interceptRecordsFromPartition(List<ConsumerRecord<Object, Object>> records) {
records.forEach(record -> {
var myHeaders = new ArrayList<Header>();
record.headers().headers("MyHeader").forEach(myHeaders::add);
log.info("My Headers: {}", myHeaders);
// Do with header as you see fit
});
}
@Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
最后一点是使用以下(Spring 引导)配置向 Kafka 消费者容器注册此拦截器:
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@Configuration
public class MessagingConfiguration {
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
}