使用无形数据类型的通用 Avro Serde
Generic Avro Serde using shapeless-datatype
我正在努力在 Scala 中创建一个通用的 AvroSerde。我将把这个 serde 与 Flink 结合使用,因此这个 serde 本身也应该是可序列化的。 Avro 没有对 Scala 的任何本机支持,但是有一些库可以使用 shapeless 从 case classes 转换为通用记录。注意:这个通用序列化器只会用 case classes 实例化。
首先,我尝试使用 Avro4s 实现这个 serde。通过确保泛型类型上下文绑定到 FromRecord
和 RecordFrom
,我很容易地编译了它,但是 FromRecord
和 RecordFrom
都不可序列化,因此我不能在 Flink 中使用这个 serde。
目前,我正在尝试另一个库 shapeless-datatype,它也使用 shapeless。我当前的代码如下所示:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
所以基本上我创建了一个 AvroType[T]
,它有两个方法 fromGenericRecord
和 toGenericRecord
(source)。这些方法需要一些隐式:LabelledGeneric.Aux[A, L]
、ToAvroRecord[L]
、tt: TypeTag[A]
和 fromL: FromAvroRecord[L]
。
目前,由于缺少这些隐含函数,这段代码给出了编译错误:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
简单地重载 toGenericRecord
和 fromGenericRecord
方法中的隐含函数并不能解决问题,因为那时我需要参数化 serialize[L <: Hlist]
和 deserialize[L <: Hlist]
我不能这样做是因为 Flink 不允许这些方法有类型。
我对 shapeless 和 implicit 都没有什么经验,无法理解我需要哪些上下文边界来解决这个问题,同时还要保持这个 class 可序列化。
希望有人能帮助我或指出一些有用的资源。
谢谢,
沃特
编辑
我不能通过这些方法传递隐式函数,也不能使它们参数化,因为我需要将 serde 基于 Flink 的序列化接口,这迫使我覆盖:byte[] serialize(T element)
和 T deserialize(byte[] message)
如果我尝试将隐式传递给 class 本身,我需要将其更改为:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
但是如果我像这样实例化它:
case class Test(str: String)
val serializer = new Serializer[Test]
我得到这个编译错误:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
你应该让 Serializer
变成 type class。 (顺便说一下,在没有必要的情况下使用 var
s 是一种不好的做法。)
import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}
trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
type L <: HList
}
object Serializer {
type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }
def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer
implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
gen: LabelledGeneric.Aux[T, L0],
toL: ToAvroRecord[L0],
fromL: FromAvroRecord[L0]): Aux[T, L0] =
new Serializer[T] {
type L = L0
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
override def serialize(value : T) : Array[Byte] = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
override def deserialize(message: Array[Byte]) : T = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
override def isEndOfStream(nextElement: T): Boolean = ???
override def getProducedType: TypeInformation[T] = ???
}
}
case class Test(str: String)
val serializer = Serializer[Test]
我正在努力在 Scala 中创建一个通用的 AvroSerde。我将把这个 serde 与 Flink 结合使用,因此这个 serde 本身也应该是可序列化的。 Avro 没有对 Scala 的任何本机支持,但是有一些库可以使用 shapeless 从 case classes 转换为通用记录。注意:这个通用序列化器只会用 case classes 实例化。
首先,我尝试使用 Avro4s 实现这个 serde。通过确保泛型类型上下文绑定到 FromRecord
和 RecordFrom
,我很容易地编译了它,但是 FromRecord
和 RecordFrom
都不可序列化,因此我不能在 Flink 中使用这个 serde。
目前,我正在尝试另一个库 shapeless-datatype,它也使用 shapeless。我当前的代码如下所示:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
所以基本上我创建了一个 AvroType[T]
,它有两个方法 fromGenericRecord
和 toGenericRecord
(source)。这些方法需要一些隐式:LabelledGeneric.Aux[A, L]
、ToAvroRecord[L]
、tt: TypeTag[A]
和 fromL: FromAvroRecord[L]
。
目前,由于缺少这些隐含函数,这段代码给出了编译错误:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
简单地重载 toGenericRecord
和 fromGenericRecord
方法中的隐含函数并不能解决问题,因为那时我需要参数化 serialize[L <: Hlist]
和 deserialize[L <: Hlist]
我不能这样做是因为 Flink 不允许这些方法有类型。
我对 shapeless 和 implicit 都没有什么经验,无法理解我需要哪些上下文边界来解决这个问题,同时还要保持这个 class 可序列化。
希望有人能帮助我或指出一些有用的资源。
谢谢, 沃特
编辑
我不能通过这些方法传递隐式函数,也不能使它们参数化,因为我需要将 serde 基于 Flink 的序列化接口,这迫使我覆盖:byte[] serialize(T element)
和 T deserialize(byte[] message)
如果我尝试将隐式传递给 class 本身,我需要将其更改为:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
但是如果我像这样实例化它:
case class Test(str: String)
val serializer = new Serializer[Test]
我得到这个编译错误:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
你应该让 Serializer
变成 type class。 (顺便说一下,在没有必要的情况下使用 var
s 是一种不好的做法。)
import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}
trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
type L <: HList
}
object Serializer {
type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }
def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer
implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
gen: LabelledGeneric.Aux[T, L0],
toL: ToAvroRecord[L0],
fromL: FromAvroRecord[L0]): Aux[T, L0] =
new Serializer[T] {
type L = L0
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
override def serialize(value : T) : Array[Byte] = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
override def deserialize(message: Array[Byte]) : T = {
val schema: Schema =
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
else ReflectData.get().getSchema(inputClassType)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
override def isEndOfStream(nextElement: T): Boolean = ???
override def getProducedType: TypeInformation[T] = ???
}
}
case class Test(str: String)
val serializer = Serializer[Test]