Avro 在消费者端通过 kafka 自定义解码 UUID
Avro custom decoding of UUID through kafka on consumer end
我写了一个 class 来将 UUID 类型的对象自定义编码为字节,以便在 kafka 和 avro 之间传输。
为了使用这个 class,我在目标对象的 uuid 变量上面放了一个 @AvroEncode(using=UUIDAsBytesEncoding.class)
。 (这是由 apache avro reflect 库实现的)
我很难弄清楚如何让我的消费者自动使用自定义解码器。 (还是必须进去手动解码?)
这是我的 UUIDAsBytesEncoder extends CustomEncoding class:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);
// encode length of data
out.writeLong(16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
System.out.println("READING");
Long size = in.readLong();
Long leastSig = in.readLong();
Long mostSig = in.readLong();
return new UUID(mostSig, leastSig);
}
}
write 方法和编码工作正常,但在反序列化时永远不会调用 read 方法。我将如何在消费者中实现它?
注册表中的架构如下所示:
{"type":"record","name":"Request","namespace":"xxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]}
`
如果消费者不能自动使用该信息来使用 UUIDAsBytesEncoding 读取方法,那么我如何在我的消费者中找到标有该标签的数据?
我也在使用 confluent schema-registry。
如有任何帮助,我们将不胜感激!
最终找到了解决方案。编码不正确——内置的 writeBytes() 方法会自动为您写入长度。
然后在消费者中,我们必须通过 GenericDatumWriter 写入二进制流,然后使用 ReflectDatumReader 从二进制流中读取。这将自动调用 UUIAsBytesEncoding read() 方法并反序列化 UUID。
我的消费者看起来像这样(作为消费者组执行服务的一部分 walkthrough here):
/**
* Start a single consumer instance
* This will use the schema built into the IndexedRecord to decode and create key/value for the message
*/
public void run() {
ConsumerIterator it = this.stream.iterator();
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
try {
String key = (String) messageAndMetadata.key();
IndexedRecord value = (IndexedRecord) messageAndMetadata.message();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));
ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
IOUtils.closeQuietly(bytes);
System.out.println("************CONSUMED: " + key + ": "+ newObject);
} catch(SerializationException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Shutting down Thread: " + this.threadNumber);
}
那么新的 UUIDAsBytesEncoding 看起来像:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
// get index in union
int index = in.readIndex();
if (index == 1) {
// read in 16 bytes of data
ByteBuffer b = ByteBuffer.allocate(16);
in.readBytes(b);
// convert
UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);
return uuid;
} else {
// no uuid present
return null;
}
}
}
这也是如何实现 CustomEncoding avro class 的示例。当前版本的 avro 没有内置 UUID 序列化程序,因此这是解决该问题的方法。
我写了一个 class 来将 UUID 类型的对象自定义编码为字节,以便在 kafka 和 avro 之间传输。
为了使用这个 class,我在目标对象的 uuid 变量上面放了一个 @AvroEncode(using=UUIDAsBytesEncoding.class)
。 (这是由 apache avro reflect 库实现的)
我很难弄清楚如何让我的消费者自动使用自定义解码器。 (还是必须进去手动解码?)
这是我的 UUIDAsBytesEncoder extends CustomEncoding class:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);
// encode length of data
out.writeLong(16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
System.out.println("READING");
Long size = in.readLong();
Long leastSig = in.readLong();
Long mostSig = in.readLong();
return new UUID(mostSig, leastSig);
}
}
write 方法和编码工作正常,但在反序列化时永远不会调用 read 方法。我将如何在消费者中实现它?
注册表中的架构如下所示:
{"type":"record","name":"Request","namespace":"xxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]} `
如果消费者不能自动使用该信息来使用 UUIDAsBytesEncoding 读取方法,那么我如何在我的消费者中找到标有该标签的数据?
我也在使用 confluent schema-registry。
如有任何帮助,我们将不胜感激!
最终找到了解决方案。编码不正确——内置的 writeBytes() 方法会自动为您写入长度。
然后在消费者中,我们必须通过 GenericDatumWriter 写入二进制流,然后使用 ReflectDatumReader 从二进制流中读取。这将自动调用 UUIAsBytesEncoding read() 方法并反序列化 UUID。
我的消费者看起来像这样(作为消费者组执行服务的一部分 walkthrough here):
/**
* Start a single consumer instance
* This will use the schema built into the IndexedRecord to decode and create key/value for the message
*/
public void run() {
ConsumerIterator it = this.stream.iterator();
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
try {
String key = (String) messageAndMetadata.key();
IndexedRecord value = (IndexedRecord) messageAndMetadata.message();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));
ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
IOUtils.closeQuietly(bytes);
System.out.println("************CONSUMED: " + key + ": "+ newObject);
} catch(SerializationException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Shutting down Thread: " + this.threadNumber);
}
那么新的 UUIDAsBytesEncoding 看起来像:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
// get index in union
int index = in.readIndex();
if (index == 1) {
// read in 16 bytes of data
ByteBuffer b = ByteBuffer.allocate(16);
in.readBytes(b);
// convert
UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);
return uuid;
} else {
// no uuid present
return null;
}
}
}
这也是如何实现 CustomEncoding avro class 的示例。当前版本的 avro 没有内置 UUID 序列化程序,因此这是解决该问题的方法。