如何删除 Kafka (Spring-boot) 中的 header?

How do I remove the header in a Kafka (Spring-boot)?

我需要向特定的 Kafka 主题发送消息。 我使用以下 KafkaTemplate 来执行此操作: KafkaTemplate

在Kafka生产者中放入以下参数:

private ProducerFactory<String, RequestDto> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);
}

创建生产者:

    public KafkaTemplate<String, RequestDto> kafkaTemplate() {
        KafkaTemplate<String, RequestDto> template = new KafkaTemplate<>(producerConfigs());
        template.setMessageConverter(new StringJsonMessageConverter());
        return template;
    }

当我执行“发送”方法时,我有一条消息发送到 Kafka 主题,但同时我发送了 header,其中包含请求的 DTO 文件的路径.

ListenableFuture<SendResult<String, RequestDto>> result = kafkaTemplate.send(topic, requestDto);

Example header in offset explorer

因此 header 应用程序出现问题,这是一个消费者,我对此无能为力。有什么方法可以从查询中删除这个 header 吗?

您可以用 ADD_TYPE_INFO_HEADERS false

创建 DefaultKafkaProducerFactory by adding JsonSerializer
JsonSerializer jsonSerializer = new JsonSerializer();
jsonSerializer.setAddTypeInfo​(false);  //  

设置为 false 以禁用添加类型信息 headers。

然后使用键和值序列化器

创建DefaultKafkaProducerFactory
return new DefaultKafkaProducerFactory<>(props,new StringSerializer(),jsonSerializer);

您也可以通过 Configuration Properties

通过生产者端禁用此属性

JsonSerializer.ADD_TYPE_INFO_HEADERS (default true): You can set it to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

并在消费者端忽略

JsonDeserializer.USE_TYPE_INFO_HEADERS (default true): You can set it to false to ignore headers set by the serializer.