Kafka Java 生产者 API 无法将密钥序列化为 Long 或 Int
Kafka Java Producer API unable to serialize key as Long or Int
这里是 Java 在 Kafka 中生成数据的代码:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ExampleClass {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "confbroker:9092";
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static void runProducer() throws Exception {
final Producer<Long, String> producer = createProducer();
long sensorId = 1001L;
try {
for (long index = sensorId; index < sensorId + 5; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "This is sensor no: " + index);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(),
record.value(), metadata.partition(), metadata.offset());
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String... args) throws Exception {
runProducer();
}
}
当 运行 控制台消费者在 Confluent 5.4.0 中时,我得到的结果是:
密钥是乱码。
如何生成 Int 或 Long 类型的 Key。
PS:
=> 在 Confluent 5.5 中也有同样的结果。
=> 与 IntegerSerializer 相同的结果。
控制台使用者使用 StringDeserialisers 作为键和值的默认值。如果你想将密钥反序列化为 Long
你必须在你的控制台消费者命令中明确提及:
--property key.deserializer org.apache.kafka.common.serialization.LongDeserializer
这里是 Java 在 Kafka 中生成数据的代码:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ExampleClass {
private final static String TOPIC = "my-example-topic";
private final static String BOOTSTRAP_SERVERS = "confbroker:9092";
private static Producer<Long, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
private static void runProducer() throws Exception {
final Producer<Long, String> producer = createProducer();
long sensorId = 1001L;
try {
for (long index = sensorId; index < sensorId + 5; index++) {
final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, index, "This is sensor no: " + index);
RecordMetadata metadata = producer.send(record).get();
System.out.printf("sent record(key=%s value=%s) " + "meta(partition=%d, offset=%d)\n", record.key(),
record.value(), metadata.partition(), metadata.offset());
}
} finally {
producer.flush();
producer.close();
}
}
public static void main(String... args) throws Exception {
runProducer();
}
}
当 运行 控制台消费者在 Confluent 5.4.0 中时,我得到的结果是:
密钥是乱码。
如何生成 Int 或 Long 类型的 Key。
PS:
=> 在 Confluent 5.5 中也有同样的结果。
=> 与 IntegerSerializer 相同的结果。
控制台使用者使用 StringDeserialisers 作为键和值的默认值。如果你想将密钥反序列化为 Long
你必须在你的控制台消费者命令中明确提及:
--property key.deserializer org.apache.kafka.common.serialization.LongDeserializer