Avro 特定记录类型与通用记录类型——哪种记录类型最好或者我可以在两者之间进行转换?

Avro specific vs generic record types - which is best or can I convert between?

我们正在努力决定是提供通用记录格式还是提供特定记录格式供我们的客户使用 着眼于提供在线模式注册表,客户端可以在模式更新时访问。 我们希望发送序列化的 blob,前缀为几个字节,表示版本号,因此 schema 从我们的注册表中检索可以自动进行。

现在,我们遇到了说明通用格式的相对适应性的代码示例 schema 改变了,但我们不愿意放弃由特定的提供的类型安全和易用性 格式。

有没有办法两全其美? IE。我们可以使用和操纵特定生成的 类 在内部,然后让他们在序列化之前自动将它们转换为通用记录?
然后客户端将反序列化通用记录(在查找模式之后)。

此外,客户能否在以后将收到的这些通用记录转换为特定记录?一些小的代码示例会有所帮助!

还是我们看错了?

您正在寻找的是 Confluent Schema 注册表服务和有助于与之集成的库。

提供一个示例来使用不断发展的模式编写序列化反序列化 avro 数据。请注意提供来自 Kafka 的示例。

import io.confluent.kafka.serializers.KafkaAvroDeserializer;  
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex;

import java.util.HashMap; import java.util.Map;

public class ConfluentSchemaService {

    public static final String TOPIC = "DUMMYTOPIC";

    private KafkaAvroSerializer avroSerializer;
    private KafkaAvroDeserializer avroDeserializer;

    public ConfluentSchemaService(String conFluentSchemaRigistryURL) {

        //PropertiesMap
        Map<String, String> propMap = new HashMap<>();
        propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
        // Output afterDeserialize should be a specific Record and not Generic Record
        propMap.put("specific.avro.reader", "true");

        avroSerializer = new KafkaAvroSerializer();
        avroSerializer.configure(propMap, true);

        avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(propMap, true);
    }

    public String hexBytesToString(byte[] inputBytes) {
        return Hex.encodeHexString(inputBytes);
    }

    public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
        return Hex.decodeHex(hexEncodedString.toCharArray());
    }

    public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
        return avroSerializer.serialize(TOPIC, avroRecord);
    }

    public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
        return avroDeserializer.deserialize(TOPIC, avroBytearray);
    } }

以下 类 有您要查找的所有代码。 io.confluent.kafka.serializers.KafkaAvroDeserializer;
io.confluent.kafka.serializers.KafkaAvroSerializer;

更多详情请关注link:

http://bytepadding.com/big-data/spark/avro/avro-serialization-de-serialization-using-confluent-schema-registry/

我可以在它们之间转换吗?

我编写了以下 kotlin 代码以从 SpecificRecord 转换为 GenericRecord 并返回 - 通过 JSON.

PositionReport 是使用 gradle 的 avro 插件从 avro 生成的对象 - 它是:


@org.apache.avro.specific.AvroGenerated
public class PositionReport extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
...

使用到的函数如下


 /**
     * Encodes a record in AVRO Compatible JSON, meaning union types
     * are wrapped. For prettier JSON just use the Object Mapper
     * @param pos PositionReport
     * @return String
     */
    private fun PositionReport.toAvroJson() : String {
        val writer = SpecificDatumWriter(PositionReport::class.java)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Converts from Genreic Record into JSON - Seems smarter, however,
     * to unify this function and the one above but whatevs
     * @param record GenericRecord
     * @param schema Schema
     */
    private fun GenericRecord.toAvroJson(): String {
        val writer = GenericDatumWriter<Any>(this.schema)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Takes a Generic Record of a position report and hopefully turns
     * it into a position report... maybe it will work
     * @param gen GenericRecord
     * @return PositionReport
     */
    private fun toPosition(gen: GenericRecord) : PositionReport {

        if (gen.schema != PositionReport.getClassSchema()) {
            throw Exception("Cannot convert GenericRecord to PositionReport as the Schemas do not match")
        }

        // We will convert into JSON - and use that to then convert back to the SpecificRecord
        // Probalby there is a better way
        val json = gen.toAvroJson()

        val reader: DatumReader<PositionReport> = SpecificDatumReader(PositionReport::class.java)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(PositionReport.getClassSchema(), json)
        val pos = reader.read(null, decoder)
        return pos
    }

    /**
     * Converts a Specific Record to a Generic Record (I think)
     * @param pos PositionReport
     * @return GenericData.Record
     */
    private fun toGenericRecord(pos: PositionReport): GenericData.Record {
        val json = pos.toAvroJson()

        val reader : DatumReader<GenericData.Record> = GenericDatumReader(pos.schema)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(pos.schema, json)
        val datum = reader.read(null, decoder)
        return datum
    }

但是两者之间有一些区别:

  • SpecificRecordInstant 类型的字段将在 GenericRecord 中编码为 long 并且枚举略有不同

因此,例如,在我对该函数的单元测试中,时间字段是这样测试的:

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getIgtd().toEpochMilli(), gen.get("igtd"))

枚举由字符串验证

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getSource().toString(), gen.get("source").toString())

所以要在两者之间进行转换,您可以这样做:


val gen = toGenericRecord(basePosition)
val newPos = toPosition(gen)

assertEquals(newPos, basePosition)