ProtoBuf 字段的 Spark、Kryo 序列化问题
Spark, Kryo Serialization Issue with ProtoBuf field
当我的 spark 作业与转换 RDD 时 protobuf 字段的序列化相关时 运行 我看到一个错误。
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
序列化跟踪:
otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)
错误似乎是在此时产生的:
val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {
tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) && !isBookPublished(o)).mapPartitions( it =>
it.map{ord =>
(ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))}))
}
val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue()
}
val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue()
}
该字段是protobuf中指定的列表,如下所示:
otherAuthors_ = java.util.Collections.emptyList()
如您所见,代码实际上并未使用 Book Protobuf 中的该字段,尽管它仍在通过网络传输。
有人对此有什么建议吗?
好吧,老问题了,但这是留给子孙后代的答案。默认的 kryo 序列化程序不适用于某些集合。有一个第三方库可以帮助它:kryo-serializers
在您的情况下,您可能需要在创建 spark 配置时提供自定义 kryo 注册器:
val conf = new SparkConf()
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
在您的注册器中需要自定义注册:
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() );
// Probably should use proto serializer for your proto classes
kryo.register( Book.class, new ProtobufSerializer() );
}
}
当我的 spark 作业与转换 RDD 时 protobuf 字段的序列化相关时 运行 我看到一个错误。
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException 序列化跟踪: otherAuthors_ (com.thomsonreuters.kraken.medusa.dbor.proto.Book$DBBooks)
错误似乎是在此时产生的:
val booksPerTier: Iterable[(TimeTier, RDD[DBBooks])] = allTiers.map {
tier => (tier, books.filter(b => isInTier(endOfInterval, tier, b) && !isBookPublished(o)).mapPartitions( it =>
it.map{ord =>
(ord.getAuthor, ord.getPublisherName, getGenre(ord.getSourceCountry))}))
}
val averagesPerAuthor = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherCompanyComparison, o._3)).countByValue()
}
val averagesPerPublisher = booksPerTier.flatMap { case (tier, opt) =>
opt.map(o => (tier, o._1, PublisherComparison(o._2), o._3)).countByValue()
}
该字段是protobuf中指定的列表,如下所示:
otherAuthors_ = java.util.Collections.emptyList()
如您所见,代码实际上并未使用 Book Protobuf 中的该字段,尽管它仍在通过网络传输。
有人对此有什么建议吗?
好吧,老问题了,但这是留给子孙后代的答案。默认的 kryo 序列化程序不适用于某些集合。有一个第三方库可以帮助它:kryo-serializers
在您的情况下,您可能需要在创建 spark 配置时提供自定义 kryo 注册器:
val conf = new SparkConf()
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
在您的注册器中需要自定义注册:
class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() );
// Probably should use proto serializer for your proto classes
kryo.register( Book.class, new ProtobufSerializer() );
}
}