使用无形数据类型的通用 Avro Serde

Generic Avro Serde using shapeless-datatype

我正在努力在 Scala 中创建一个通用的 AvroSerde。我将把这个 serde 与 Flink 结合使用,因此这个 serde 本身也应该是可序列化的。 Avro 没有对 Scala 的任何本机支持,但是有一些库可以使用 shapeless 从 case classes 转换为通用记录。注意:这个通用序列化器只会用 case classes 实例化。

首先,我尝试使用 Avro4s 实现这个 serde。通过确保泛型类型上下文绑定到 FromRecordRecordFrom,我很容易地编译了它,但是 FromRecordRecordFrom 都不可序列化,因此我不能在 Flink 中使用这个 serde。

目前,我正在尝试另一个库 shapeless-datatype,它也使用 shapeless。我当前的代码如下所示:

class Serializer[T : TypeTag : ClassTag] {

  //Get type of the class at run time
  val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  //Get Avro Type
  val avroType = AvroType[T]

  def serialize(value : T) : Array[Byte] = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val out: ByteArrayOutputStream = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

    val genericRecord = avroType.toGenericRecord(value)

    writer.write(genericRecord, encoder)
    encoder.flush()
    out.close()

    out.toByteArray
  }

  def deserialize(message: Array[Byte]) : T = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)

    avroType.fromGenericRecord(datumReader.read(null, decoder)).get
  }


}

所以基本上我创建了一个 AvroType[T],它有两个方法 fromGenericRecordtoGenericRecord (source)。这些方法需要一些隐式:LabelledGeneric.Aux[A, L]ToAvroRecord[L]tt: TypeTag[A]fromL: FromAvroRecord[L]

目前,由于缺少这些隐含函数,这段代码给出了编译错误:

Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
  val genericRecord = avroType.toGenericRecord(value)

简单地重载 toGenericRecordfromGenericRecord 方法中的隐含函数并不能解决问题,因为那时我需要参数化 serialize[L <: Hlist]deserialize[L <: Hlist] 我不能这样做是因为 Flink 不允许这些方法有类型。

我对 shapeless 和 implicit 都没有什么经验,无法理解我需要哪些上下文边界来解决这个问题,同时还要保持这个 class 可序列化。

希望有人能帮助我或指出一些有用的资源。

谢谢, 沃特

编辑

我不能通过这些方法传递隐式函数,也不能使它们参数化,因为我需要将 serde 基于 Flink 的序列化接口,这迫使我覆盖:byte[] serialize(T element)T deserialize(byte[] message)

如果我尝试将隐式传递给 class 本身,我需要将其更改为:

class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])

但是如果我像这样实例化它:

case class Test(str: String)
val serializer = new Serializer[Test]

我得到这个编译错误:

Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]

你应该让 Serializer 变成 type class。 (顺便说一下,在没有必要的情况下使用 vars 是一种不好的做法。)

import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}  
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}

trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
  type L <: HList
}

object Serializer {
  type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }

  def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer

  implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
    gen: LabelledGeneric.Aux[T, L0],
    toL: ToAvroRecord[L0],
    fromL: FromAvroRecord[L0]): Aux[T, L0] =
    new Serializer[T] {
      type L = L0

      //Get type of the class at run time
      val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

      //Get Avro Type
      val avroType = AvroType[T]

      override def serialize(value : T) : Array[Byte] = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val out: ByteArrayOutputStream = new ByteArrayOutputStream()
        val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
        val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

        val genericRecord = avroType.toGenericRecord(value)

        writer.write(genericRecord, encoder)
        encoder.flush()
        out.close()

        out.toByteArray
      }

      override def deserialize(message: Array[Byte]) : T = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val datumReader = new GenericDatumReader[GenericRecord](schema)
        val decoder = DecoderFactory.get().binaryDecoder(message, null)

        avroType.fromGenericRecord(datumReader.read(null, decoder)).get
      }

      override def isEndOfStream(nextElement: T): Boolean = ???

      override def getProducedType: TypeInformation[T] = ???
    }
}

case class Test(str: String)    
val serializer = Serializer[Test]