在 Kafka 消息中添加自定义 Headers

Adding Custom Headers in Kafka Message

我通过使用 kafka 生产者将文件转换为字节数组来将文件作为消息发送。

我还需要为消息添加一些headers,例如文件名、时间戳等,以便在消费者端我可以根据文件名和其他headers处理消息。

我目前正在做的是创建一个 object 并将原始消息和 headers 包装在其中并将 object 作为消息发送到字节数组中。

我想知道是否有一种方法可以在发布消息时添加自定义 headers?

Kafka 对消息内容是不可知的 并且不提供任何特殊的方法来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,例如 json、avro 或类似的格式,您可以在其中自由定义必要的字段,并且可以轻松地将元数据添加到您的消息中并将其发送给 Kafka 代理.

此答案自 Kafka 0.11 起已过时,请参阅其他答案。

我在从事的项目中遇到过类似的问题,因此我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging。目前包含基于 Avro 的实现,但可以扩展它以使用您选择的任何其他序列化框架。

您只需为要在流中拥有的对象(基于或不基于 Avro)创建一个(反)序列化器,然后让 AvroMessageSerializer 发挥它的魔力。

这仍然是一个非常年轻的图书馆,但我觉得它可以为很多人节省很多时间!

Kafka v0.11.0.0 添加了对自定义 headers 的支持。

您可以在创建 ProducerRecord 时添加它们,如下所示:

new ProducerRecord(key, value, headers, ...),其中 headers 是 Iterable

类型

有关详细信息,请参阅:

https://issues.apache.org/jira/browse/KAFKA-4208

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

您可以创建自己的小型 java 应用程序,以将带有 headers 的消息发送到 kafka。 在intellij或任何支持的IDE中编写以下代码:-

public static void main(String[] args) throws JsonProcessingException, 
InterruptedException {
  Properties props=new Properties();       
  props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
  JsonSerializer.class.getName());
  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class.getName());

  KafkaProducer<String, JsonNode> producer=new KafkaProducer<String, JsonNode>(props);
  String json = "{ \"f1\" : \"v1\" } ";
  ObjectMapper mapper = new ObjectMapper();
  JsonNode jsonNode = mapper.readTree(json);
  ProducerRecord<String,JsonNode> record =new ProducerRecord<String, 
  JsonNode>("test topic", jsonNode);
  record.headers().add(new RecordHeader("key","value1".getBytes()));
  producer.send(record);
  Thread.sleep(10000);
}

String json = "{ \"f1\" : \"v1\" } " - 这是我们要使用 objectMapper 发送到 kafka 并将其转换为 json节点形式。 record.headers().add(new RecordHeader("key","value1".getBytes()))-这是我们发送给kafka的headers数据的key和value . 要验证您的数据,您可以在 kafka 控制中心检查主题并验证发送的 headers。

记录级别 headers 是从 Kafka 0.11.0 引入的。我们可以在每条记录中发送 Headers 的列表。

List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);