Spark 中的 NotSerializableException

NotSerializableException in Spark

大多数在线不可序列化问题都获得非常基本的数据作为其 sc.parallelize() 的输入,并且在地图部分中他们遇到了不可序列化问题,但我的是一种类型。我有一个特定的数据类型,它来自第三方库并且不可序列化。 所以写这个显示 NotSerializableException :

val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX)

data.foreachPartition(rows => {
  rows.foreach(row => {
    println("value: " + row.getValue)    
  })  
})

作为解决方案,我在内部创建了相同的模型 class(XData) 但使其可序列化并执行了以下操作:

val data: RDD[XData] = (sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]

data.foreachPartition(rows => {
  rows.foreach(row => {
    println("value: " + row.getValue)    
  })  
})

我期待问题得到解决,但我仍然收到与 [ERROR] org.apache.spark.util.Utils logError - Exception encountered java.io.NotSerializableException: ThirdPartyLib.models.XData 相同的错误。当我创建该内部可序列化类型时,问题不应该重新解决吗?我该如何解决这个问题?

所以

(sc.parallelize(ThirdPartyLib.getX)).asInstanceOf[RDD[XData]]

你先并行化,然后转换。所以 spark 仍然需要 ThirdPartyLib.models.XData 才能序列化。此外,由于类型不同,该演员表可能会爆炸。

我认为这应该可以解决问题

def convertThirdPartyXDataToMyXData( xd: ThirdPartyLib.models.XData): XData = ???

val data: RDD[ThirdPartyLib.models.XData] = sc.parallelize(ThirdPartyLib.getX.map(convertThirdPartyXDataToMyXData)) //if you have a map on the collection that getX returns