在 Kafka 消息中添加自定义 Headers
Adding Custom Headers in Kafka Message
我通过使用 kafka 生产者将文件转换为字节数组来将文件作为消息发送。
我还需要为消息添加一些headers,例如文件名、时间戳等,以便在消费者端我可以根据文件名和其他headers处理消息。
我目前正在做的是创建一个 object 并将原始消息和 headers 包装在其中并将 object 作为消息发送到字节数组中。
我想知道是否有一种方法可以在发布消息时添加自定义 headers?
Kafka 对消息内容是不可知的 并且不提供任何特殊的方法来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,例如 json、avro 或类似的格式,您可以在其中自由定义必要的字段,并且可以轻松地将元数据添加到您的消息中并将其发送给 Kafka 代理.
此答案自 Kafka 0.11 起已过时,请参阅其他答案。
我在从事的项目中遇到过类似的问题,因此我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging。目前包含基于 Avro 的实现,但可以扩展它以使用您选择的任何其他序列化框架。
您只需为要在流中拥有的对象(基于或不基于 Avro)创建一个(反)序列化器,然后让 AvroMessageSerializer 发挥它的魔力。
这仍然是一个非常年轻的图书馆,但我觉得它可以为很多人节省很多时间!
Kafka v0.11.0.0 添加了对自定义 headers 的支持。
您可以在创建 ProducerRecord 时添加它们,如下所示:
new ProducerRecord(key, value, headers, ...),其中 headers 是 Iterable 类型
有关详细信息,请参阅:
https://issues.apache.org/jira/browse/KAFKA-4208
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
您可以创建自己的小型 java 应用程序,以将带有 headers 的消息发送到 kafka。
在intellij或任何支持的IDE中编写以下代码:-
public static void main(String[] args) throws JsonProcessingException,
InterruptedException {
Properties props=new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class.getName());
KafkaProducer<String, JsonNode> producer=new KafkaProducer<String, JsonNode>(props);
String json = "{ \"f1\" : \"v1\" } ";
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(json);
ProducerRecord<String,JsonNode> record =new ProducerRecord<String,
JsonNode>("test topic", jsonNode);
record.headers().add(new RecordHeader("key","value1".getBytes()));
producer.send(record);
Thread.sleep(10000);
}
String json = "{ \"f1\" : \"v1\" } " - 这是我们要使用 objectMapper 发送到 kafka 并将其转换为 json节点形式。
record.headers().add(new RecordHeader("key","value1".getBytes()))-这是我们发送给kafka的headers数据的key和value .
要验证您的数据,您可以在 kafka 控制中心检查主题并验证发送的 headers。
记录级别 headers 是从 Kafka 0.11.0 引入的。我们可以在每条记录中发送 Headers 的列表。
List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);
我通过使用 kafka 生产者将文件转换为字节数组来将文件作为消息发送。
我还需要为消息添加一些headers,例如文件名、时间戳等,以便在消费者端我可以根据文件名和其他headers处理消息。
我目前正在做的是创建一个 object 并将原始消息和 headers 包装在其中并将 object 作为消息发送到字节数组中。
我想知道是否有一种方法可以在发布消息时添加自定义 headers?
Kafka 对消息内容是不可知的 并且不提供任何特殊的方法来丰富它,所以这是你需要自己做的事情。处理这些事情的一种常见方法是使用结构化格式,例如 json、avro 或类似的格式,您可以在其中自由定义必要的字段,并且可以轻松地将元数据添加到您的消息中并将其发送给 Kafka 代理.
此答案自 Kafka 0.11 起已过时,请参阅其他答案。
我在从事的项目中遇到过类似的问题,因此我创建了这个简单的库来帮助解决这个问题:https://github.com/leandronunes85/messaging。目前包含基于 Avro 的实现,但可以扩展它以使用您选择的任何其他序列化框架。
您只需为要在流中拥有的对象(基于或不基于 Avro)创建一个(反)序列化器,然后让 AvroMessageSerializer 发挥它的魔力。
这仍然是一个非常年轻的图书馆,但我觉得它可以为很多人节省很多时间!
Kafka v0.11.0.0 添加了对自定义 headers 的支持。
您可以在创建 ProducerRecord 时添加它们,如下所示:
new ProducerRecord(key, value, headers, ...),其中 headers 是 Iterable
有关详细信息,请参阅:
https://issues.apache.org/jira/browse/KAFKA-4208
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers
您可以创建自己的小型 java 应用程序,以将带有 headers 的消息发送到 kafka。 在intellij或任何支持的IDE中编写以下代码:-
public static void main(String[] args) throws JsonProcessingException,
InterruptedException {
Properties props=new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class.getName());
KafkaProducer<String, JsonNode> producer=new KafkaProducer<String, JsonNode>(props);
String json = "{ \"f1\" : \"v1\" } ";
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree(json);
ProducerRecord<String,JsonNode> record =new ProducerRecord<String,
JsonNode>("test topic", jsonNode);
record.headers().add(new RecordHeader("key","value1".getBytes()));
producer.send(record);
Thread.sleep(10000);
}
String json = "{ \"f1\" : \"v1\" } " - 这是我们要使用 objectMapper 发送到 kafka 并将其转换为 json节点形式。 record.headers().add(new RecordHeader("key","value1".getBytes()))-这是我们发送给kafka的headers数据的key和value . 要验证您的数据,您可以在 kafka 控制中心检查主题并验证发送的 headers。
记录级别 headers 是从 Kafka 0.11.0 引入的。我们可以在每条记录中发送 Headers 的列表。
List<Header> headers = Arrays.asList(new RecordHeader("header_key", "header_value".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("topic", null, "key", "value", headers);