向 Kafka 生成记录
Produce Record to Kafka
我正在尝试在不传递任何分区值的情况下向 Kafka 生成记录,但想发送一个 header 并且我有以下构造方法来向 Kafka 生成记录:
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)
创建具有指定时间戳的记录,以发送到指定的主题和分区
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value, java.lang.Iterable<Header> headers)
创建具有指定时间戳的记录,以发送到指定的主题和分区
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value)
创建要发送到指定主题和分区的记录
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value, java.lang.Iterable<Header> headers)
创建要发送到指定主题和分区的记录
ProducerRecord(java.lang.String topic, K key, V value)
创建要发送到 Kafka 的记录
ProducerRecord(java.lang.String topic, V value)
创建一个没有键的记录
在上述所有方法中,我无法在不发送分区值的情况下发送 header,如果我将分区设置为 null,我将得到 NullPointerException
.
你能告诉我如何通过发送 header 而不是分区值来在 Kafka 中生成记录吗?
ProducerRecord
object 有方法 headers()
returns Headers
接口,它又允许你 add()
任何 header你喜欢。
Kafka ProducerRecord 实例提供了 Headers 实例,我们可以在其中添加键值对。
您可以通过Kafka源码获取完整信息。
例子
我尝试使用 kafka-clients-0.8.2.0
,但该功能在该版本中不可用。所以你还需要检查 Kafka 的客户端版本。
下面给出的例子是使用 Kafka Client - 2.1.0
.
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
Headers headers = record.headers();
headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
producer.send(record);
我正在尝试在不传递任何分区值的情况下向 Kafka 生成记录,但想发送一个 header 并且我有以下构造方法来向 Kafka 生成记录:
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value)
创建具有指定时间戳的记录,以发送到指定的主题和分区
ProducerRecord(java.lang.String topic, java.lang.Integer partition, java.lang.Long timestamp, K key, V value, java.lang.Iterable<Header> headers)
创建具有指定时间戳的记录,以发送到指定的主题和分区
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value)
创建要发送到指定主题和分区的记录
ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value, java.lang.Iterable<Header> headers)
创建要发送到指定主题和分区的记录
ProducerRecord(java.lang.String topic, K key, V value)
创建要发送到 Kafka 的记录
ProducerRecord(java.lang.String topic, V value)
创建一个没有键的记录
在上述所有方法中,我无法在不发送分区值的情况下发送 header,如果我将分区设置为 null,我将得到 NullPointerException
.
你能告诉我如何通过发送 header 而不是分区值来在 Kafka 中生成记录吗?
ProducerRecord
object 有方法 headers()
returns Headers
接口,它又允许你 add()
任何 header你喜欢。
Kafka ProducerRecord 实例提供了 Headers 实例,我们可以在其中添加键值对。
您可以通过Kafka源码获取完整信息。
例子
我尝试使用 kafka-clients-0.8.2.0
,但该功能在该版本中不可用。所以你还需要检查 Kafka 的客户端版本。
下面给出的例子是使用 Kafka Client - 2.1.0
.
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, message);
Headers headers = record.headers();
headers.add(KafkaHeaders.HEADER_CLIENT_IP, Strings.bytes(Network.localHostAddress()));
headers.add(KafkaHeaders.HEADER_CLIENT, Strings.bytes(logManager.appName));
producer.send(record);