如何删除 Kafka (Spring-boot) 中的 header?
How do I remove the header in a Kafka (Spring-boot)?
我需要向特定的 Kafka 主题发送消息。
我使用以下 KafkaTemplate 来执行此操作:
KafkaTemplate
在Kafka生产者中放入以下参数:
private ProducerFactory<String, RequestDto> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
创建生产者:
public KafkaTemplate<String, RequestDto> kafkaTemplate() {
KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
template.setMessageConverter(new StringJsonMessageConverter());
return template;
}
当我执行“发送”方法时,我有一条消息发送到 Kafka 主题,但同时我发送了 header,其中包含请求的 DTO 文件的路径.
ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);
Example header in offset explorer
因此 header 应用程序出现问题,这是一个消费者,我对此无能为力。有什么方法可以从查询中删除这个 header 吗?
您可以用 ADD_TYPE_INFO_HEADERS
false
创建 DefaultKafkaProducerFactory by adding JsonSerializer
JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo(false); //
设置为 false 以禁用添加类型信息 headers。
然后使用键和值序列化器
创建DefaultKafkaProducerFactory
return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);
您也可以通过 Configuration Properties
通过生产者端禁用此属性
JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
并在消费者端忽略
JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.
我需要向特定的 Kafka 主题发送消息。
我使用以下 KafkaTemplate 来执行此操作:
KafkaTemplate
在Kafka生产者中放入以下参数:
private ProducerFactory<String, RequestDto> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
创建生产者:
public KafkaTemplate<String, RequestDto> kafkaTemplate() {
KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
template.setMessageConverter(new StringJsonMessageConverter());
return template;
}
当我执行“发送”方法时,我有一条消息发送到 Kafka 主题,但同时我发送了 header,其中包含请求的 DTO 文件的路径.
ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);
Example header in offset explorer
因此 header 应用程序出现问题,这是一个消费者,我对此无能为力。有什么方法可以从查询中删除这个 header 吗?
您可以用 ADD_TYPE_INFO_HEADERS
false
JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo(false); //
设置为 false 以禁用添加类型信息 headers。
然后使用键和值序列化器
创建DefaultKafkaProducerFactory
return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);
您也可以通过 Configuration Properties
通过生产者端禁用此属性JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).
并在消费者端忽略
JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.