如何在Kafka反序列化器中实现反序列化方法?
How to implement deserialize method in Kafka Deserializer?
我正在制作我的第一个 Apache Kafka 消费者。所以这个例子看起来不错。 https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html
我在实施反序列化方法时遇到问题。假设我在客户中有地址字段。我想我应该制作 AddressDeserializer 但如何知道 AddressDeserializer 需要读取多少字节?地址有 3 个字符串字段,但有时其中一些为空。那么我应该传递给它 3*8 字节吗?我想这不是解决方案。另外,buffer.get(nameBytes) 方法对我来说似乎很不自然,因为 in 参数同时是 out 参数,这是不好的做法。这是检索字节的正确方法还是我遗漏了什么?先感谢您。
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name); 2
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
这里应该用到Jackson库。添加 ObjectMapper
作为依赖,然后:
return objectMapper.readValue(data, Customer.class);
我正在制作我的第一个 Apache Kafka 消费者。所以这个例子看起来不错。 https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html 我在实施反序列化方法时遇到问题。假设我在客户中有地址字段。我想我应该制作 AddressDeserializer 但如何知道 AddressDeserializer 需要读取多少字节?地址有 3 个字符串字段,但有时其中一些为空。那么我应该传递给它 3*8 字节吗?我想这不是解决方案。另外,buffer.get(nameBytes) 方法对我来说似乎很不自然,因为 in 参数同时是 out 参数,这是不好的做法。这是检索字节的正确方法还是我遗漏了什么?先感谢您。
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name); 2
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
这里应该用到Jackson库。添加 ObjectMapper
作为依赖,然后:
return objectMapper.readValue(data, Customer.class);