需要在 Spark (Scala) 中进行 kryo 序列化
Require kryo serialization in Spark (Scala)
我用这个打开了 kryo 序列化:
conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )
我想确保自定义 class 在节点之间洗牌时使用 kryo 序列化。我可以这样用 kryo 注册 class:
conf.registerKryoClasses(Array(classOf[Foo]))
据我了解,这实际上并不能保证使用kyro序列化;如果序列化器不可用,kryo 将回退到 Java 序列化。
为了保证 kryo 序列化发生,我遵循了 Spark 文档中的建议:
conf.set("spark.kryo.registrationRequired", "true")
但这会导致抛出 IllegalArugmentException ("Class is not registered") 一堆不同的 classes,我假设 Spark 在内部使用,例如以下内容:
org.apache.spark.util.collection.CompactBuffer
scala.Tuple3
我当然不需要用 kryo 手动注册每个 classes 吗?这些序列化器都是在kryo中定义的,那么有没有办法自动注册它们呢?
As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.
没有。如果您将 spark.serializer
设置为 org.apache.spark.serializer.
KryoSerializer
,那么 Spark 将使用 Kryo。如果 Kryo 不可用,您将收到错误消息。没有后备。
那么这个 Kryo 注册是什么?
当 Kryo 序列化未注册 class 的实例时,它必须输出完全限定的 class 名称。这是很多字符。相反,如果一个 class 已经被预先注册,Kryo 可以只输出一个数字引用到这个 class,这只是 1-2 个字节。
当 RDD 的每一行都用 Kryo 序列化时,这一点尤其重要。您不想为十亿行中的每一行包含相同的 class 名称。因此,您预先注册了这些 classes。但是很容易忘记注册一个新的 class 然后你又在浪费字节。解决办法是要求每个class都注册:
conf.set("spark.kryo.registrationRequired", "true")
现在 Kryo 将永远不会输出完整的 class 名称。如果遇到未注册的class,那就是运行时错误。
不幸的是,很难枚举所有您要提前序列化的 classes。这个想法是 Spark 注册特定于 Spark 的 classes,然后您注册其他所有内容。你有一个RDD[(X, Y, Z)]
?您必须注册 classOf[scala.Tuple3[_, _, _]]
.
list of classes that Spark registers actually includes CompactBuffer
, so if you see an error for that, you're doing something wrong. You are bypassing the Spark registration procedure. You have to use either spark.kryo.classesToRegister
or spark.kryo.registrator
to register your classes. (See the config options. If you use GraphX, your registrator should call GraphXUtils. registerKryoClasses.)
根据您所看到的情况,最好的猜测是您遗漏了以下声明:
sparkConf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )
在过去的几天里,我也一直在努力将序列化转换为 Kryo,包括 GraphX,包括向 Kryo 注册 scala.Tuple3
,显然是因为 Spark/GraphX 代码在我创建 Tuple3 时做一个 'sortBy'.
一直在逐一添加一堆其他 类 来列出以注册 Kryo,主要是 Scala 和 Spark 类 我认为我不需要添加。 Thinking/hoping 必须有更好的方法将 Kryo 与 Spark 结合使用。
我有一个方法可以获取所有需要快速注册的 class 名称。
implicit class FieldExtensions(private val obj: Object) extends AnyVal {
def readFieldAs[T](fieldName: String): T = {
FieldUtils.readField(obj, fieldName, true).asInstanceOf[T]
}
def writeField(fieldName: String, value: Object): Unit = {
FieldUtils.writeField(obj, fieldName, value, true)
}
}
class LogClassResolver extends DefaultClassResolver {
override def registerImplicit(t: Class[_]): Registration = {
println(s"registerImplicitclasstype:${t.getName}")
super.registerImplicit(t)
}
def copyFrom(resolver: DefaultClassResolver): Unit = {
this.kryo = resolver.readFieldAs("kryo")
this.idToRegistration.putAll(resolver.readFieldAs("idToRegistration"))
this.classToRegistration.putAll(resolver.readFieldAs("classToRegistration"))
this.classToNameId = resolver.readFieldAs("classToNameId")
this.nameIdToClass = resolver.readFieldAs("nameIdToClass")
this.nameToClass = resolver.readFieldAs("nameToClass")
this.nextNameId = resolver.readFieldAs("nextNameId")
this.writeField("memoizedClassId", resolver.readFieldAs("memoizedClassId"))
this.writeField("memoizedClassIdValue", resolver.readFieldAs("memoizedClassIdValue"))
this.writeField("memoizedClass", resolver.readFieldAs("memoizedClass"))
this.writeField("memoizedClassValue", resolver.readFieldAs("memoizedClassValue"))
}
}
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
val newResolver = new LogClassResolver
newResolver.copyFrom(kryo.getClassResolver.asInstanceOf[DefaultClassResolver])
FieldUtils.writeField(kryo, "classResolver", newResolver, true)
}
}
你只需要在 spark 会话中注册 MyRegistrator
。
val sparkSession = SparkSession.builder()
.appName("Your_Spark_App")
.config("spark.kryo.registrator", classOf[MyRegistrator].getTypeName)
.getOrCreate()
// all your spark logic will be added here
之后,向集群提交一个小示例 spark 应用程序,所有需要注册的 class 名称将被打印到标准输出。然后下面的 linux 命令将得到 class 名称列表:
yarn logs --applicationId {your_spark_app_id} | grep registerImplicitclasstype >> type_names.txt
sort -u type_names.txt
然后在您的注册器中注册所有 class 名称:
kryo.registser(Class.forName("class name"))
之后,您可以将 config("spark.kryo.registrationRequired", "true")
添加到 spark conf。
有时 yarn logs 可能会丢失,您可以重新运行上述过程。
ps:上面的代码适用于 spark 版本 2.1.2。
尽情享受吧。
我用这个打开了 kryo 序列化:
conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )
我想确保自定义 class 在节点之间洗牌时使用 kryo 序列化。我可以这样用 kryo 注册 class:
conf.registerKryoClasses(Array(classOf[Foo]))
据我了解,这实际上并不能保证使用kyro序列化;如果序列化器不可用,kryo 将回退到 Java 序列化。
为了保证 kryo 序列化发生,我遵循了 Spark 文档中的建议:
conf.set("spark.kryo.registrationRequired", "true")
但这会导致抛出 IllegalArugmentException ("Class is not registered") 一堆不同的 classes,我假设 Spark 在内部使用,例如以下内容:
org.apache.spark.util.collection.CompactBuffer
scala.Tuple3
我当然不需要用 kryo 手动注册每个 classes 吗?这些序列化器都是在kryo中定义的,那么有没有办法自动注册它们呢?
As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.
没有。如果您将 spark.serializer
设置为 org.apache.spark.serializer.
KryoSerializer
,那么 Spark 将使用 Kryo。如果 Kryo 不可用,您将收到错误消息。没有后备。
那么这个 Kryo 注册是什么?
当 Kryo 序列化未注册 class 的实例时,它必须输出完全限定的 class 名称。这是很多字符。相反,如果一个 class 已经被预先注册,Kryo 可以只输出一个数字引用到这个 class,这只是 1-2 个字节。
当 RDD 的每一行都用 Kryo 序列化时,这一点尤其重要。您不想为十亿行中的每一行包含相同的 class 名称。因此,您预先注册了这些 classes。但是很容易忘记注册一个新的 class 然后你又在浪费字节。解决办法是要求每个class都注册:
conf.set("spark.kryo.registrationRequired", "true")
现在 Kryo 将永远不会输出完整的 class 名称。如果遇到未注册的class,那就是运行时错误。
不幸的是,很难枚举所有您要提前序列化的 classes。这个想法是 Spark 注册特定于 Spark 的 classes,然后您注册其他所有内容。你有一个RDD[(X, Y, Z)]
?您必须注册 classOf[scala.Tuple3[_, _, _]]
.
list of classes that Spark registers actually includes CompactBuffer
, so if you see an error for that, you're doing something wrong. You are bypassing the Spark registration procedure. You have to use either spark.kryo.classesToRegister
or spark.kryo.registrator
to register your classes. (See the config options. If you use GraphX, your registrator should call GraphXUtils. registerKryoClasses.)
根据您所看到的情况,最好的猜测是您遗漏了以下声明:
sparkConf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )
在过去的几天里,我也一直在努力将序列化转换为 Kryo,包括 GraphX,包括向 Kryo 注册 scala.Tuple3
,显然是因为 Spark/GraphX 代码在我创建 Tuple3 时做一个 'sortBy'.
一直在逐一添加一堆其他 类 来列出以注册 Kryo,主要是 Scala 和 Spark 类 我认为我不需要添加。 Thinking/hoping 必须有更好的方法将 Kryo 与 Spark 结合使用。
我有一个方法可以获取所有需要快速注册的 class 名称。
implicit class FieldExtensions(private val obj: Object) extends AnyVal {
def readFieldAs[T](fieldName: String): T = {
FieldUtils.readField(obj, fieldName, true).asInstanceOf[T]
}
def writeField(fieldName: String, value: Object): Unit = {
FieldUtils.writeField(obj, fieldName, value, true)
}
}
class LogClassResolver extends DefaultClassResolver {
override def registerImplicit(t: Class[_]): Registration = {
println(s"registerImplicitclasstype:${t.getName}")
super.registerImplicit(t)
}
def copyFrom(resolver: DefaultClassResolver): Unit = {
this.kryo = resolver.readFieldAs("kryo")
this.idToRegistration.putAll(resolver.readFieldAs("idToRegistration"))
this.classToRegistration.putAll(resolver.readFieldAs("classToRegistration"))
this.classToNameId = resolver.readFieldAs("classToNameId")
this.nameIdToClass = resolver.readFieldAs("nameIdToClass")
this.nameToClass = resolver.readFieldAs("nameToClass")
this.nextNameId = resolver.readFieldAs("nextNameId")
this.writeField("memoizedClassId", resolver.readFieldAs("memoizedClassId"))
this.writeField("memoizedClassIdValue", resolver.readFieldAs("memoizedClassIdValue"))
this.writeField("memoizedClass", resolver.readFieldAs("memoizedClass"))
this.writeField("memoizedClassValue", resolver.readFieldAs("memoizedClassValue"))
}
}
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
val newResolver = new LogClassResolver
newResolver.copyFrom(kryo.getClassResolver.asInstanceOf[DefaultClassResolver])
FieldUtils.writeField(kryo, "classResolver", newResolver, true)
}
}
你只需要在 spark 会话中注册 MyRegistrator
。
val sparkSession = SparkSession.builder()
.appName("Your_Spark_App")
.config("spark.kryo.registrator", classOf[MyRegistrator].getTypeName)
.getOrCreate()
// all your spark logic will be added here
之后,向集群提交一个小示例 spark 应用程序,所有需要注册的 class 名称将被打印到标准输出。然后下面的 linux 命令将得到 class 名称列表:
yarn logs --applicationId {your_spark_app_id} | grep registerImplicitclasstype >> type_names.txt
sort -u type_names.txt
然后在您的注册器中注册所有 class 名称: kryo.registser(Class.forName("class name"))
之后,您可以将 config("spark.kryo.registrationRequired", "true")
添加到 spark conf。
有时 yarn logs 可能会丢失,您可以重新运行上述过程。
ps:上面的代码适用于 spark 版本 2.1.2。
尽情享受吧。