SparkContext 在伴随对象中不可序列化
SparkContext not serializable inside a companion object
我目前正在尝试扩展使用 Scala 和 Spark 的机器学习应用程序。我正在使用我在 Github
上找到的 Dieterich Lawson 之前项目的结构
https://github.com/dieterichlawson/admm
该项目基本上使用 SparkContext 构建训练样本块的 RDD,然后对这些集合中的每一个执行本地计算(例如求解线性系统)。
我遵循相同的方案,但对于我的本地计算,我需要对每个训练样本块执行 L-BFGS 算法。为此,我想使用 mlLib 中的 L-BFGS 算法,它具有以下签名。
runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient,
Updater updater, int numCorrections, double convergenceTol,
int maxNumIterations, double regParam, Vector initialWeights)
正如它所说,该方法将训练样本的 RDD[Object,Vector] 作为输入。问题是,在每个工作人员的本地,我不再保留数据的 RDD 结构。因此,我试图在矩阵的每个块上使用 SparkContext 的并行化函数。但是当我这样做时,我得到了一个序列化程序异常。 (确切的异常消息在问题的末尾)。
这是对我如何处理 SparkContext 的详细解释。
首先,在主应用程序中,它用于打开文本文件,并用于 class LogRegressionXUpdate 的工厂:
val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)
在应用中,class LogRegressionXUpdate 实现如下
class LogRegressionXUpdate(val training: RDD[(Double, NV)],
val rho: Double) extends Function1[BDV[Double],Double] with Prox with Serializable{
def prox(x: BDV[Double], rho: Double): BDV[Double] = {
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val (weights, loss) = LBFGS.runLBFGS(
training,
new GradientForLogRegADMM(rho,fromBreeze(x)),
new SimpleUpdater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
fromBreeze(x))
toBreeze(weights.toArray).toDenseVector
}
def apply(x: BDV[Double]): Double = {
Math.pow(1,2.0)
}
}
具有以下伴生对象:
object LogRegressionXUpdate {
def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
val fns = new BlockMatrix(file, blockHeight).blocks.
map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
new RDF[LogRegressionXUpdate](fns, 0L)
}
}
虽然我并不是真的需要 SparkContext 在本地构建每个 RDD,但此构造函数导致了序列化错误。我已经搜索了这个问题的解决方案,但添加 @transient 并没有解决它。
然后,我的问题是:是否真的有可能构建这些 "second layer RDDs" 或我被迫执行 L-BFGS 算法的非分布式版本。
提前致谢!
错误日志:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun, name: sc, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
RDD
s 只能从驱动程序访问。每当你调用
myRDD.map(someObject.someMethod)
spark 序列化计算 someMethod
所需的任何内容,并将其发送给工作人员。在那里,该方法被反序列化,然后在每个分区上独立运行。
但是,您尝试使用 本身 使用 spark 的方法:您尝试创建一个新的 RDD。但是,这是不可能的,因为它们只能在驱动程序中创建。您看到的错误是 spark 尝试序列化 spark 上下文本身,因为每个块的计算都需要它。有关序列化的更多信息,请参阅 this 问题的第一个答案。
“...虽然我并不是真的需要 SparkContext 在本地构建每个 RDD”——实际上这正是您在调用 sc.parallelize
时正在做的事情。底线 - 您需要找到(或编写)L-BFGS 的本地实现。
我目前正在尝试扩展使用 Scala 和 Spark 的机器学习应用程序。我正在使用我在 Github
上找到的 Dieterich Lawson 之前项目的结构https://github.com/dieterichlawson/admm
该项目基本上使用 SparkContext 构建训练样本块的 RDD,然后对这些集合中的每一个执行本地计算(例如求解线性系统)。
我遵循相同的方案,但对于我的本地计算,我需要对每个训练样本块执行 L-BFGS 算法。为此,我想使用 mlLib 中的 L-BFGS 算法,它具有以下签名。
runLBFGS(RDD<scala.Tuple2<Object,Vector>> data, Gradient gradient,
Updater updater, int numCorrections, double convergenceTol,
int maxNumIterations, double regParam, Vector initialWeights)
正如它所说,该方法将训练样本的 RDD[Object,Vector] 作为输入。问题是,在每个工作人员的本地,我不再保留数据的 RDD 结构。因此,我试图在矩阵的每个块上使用 SparkContext 的并行化函数。但是当我这样做时,我得到了一个序列化程序异常。 (确切的异常消息在问题的末尾)。
这是对我如何处理 SparkContext 的详细解释。
首先,在主应用程序中,它用于打开文本文件,并用于 class LogRegressionXUpdate 的工厂:
val A = sc.textFile("ds1.csv")
A.checkpoint
val f = LogRegressionXUpdate.fromTextFile(A,params.rho,1024,sc)
在应用中,class LogRegressionXUpdate 实现如下
class LogRegressionXUpdate(val training: RDD[(Double, NV)],
val rho: Double) extends Function1[BDV[Double],Double] with Prox with Serializable{
def prox(x: BDV[Double], rho: Double): BDV[Double] = {
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val (weights, loss) = LBFGS.runLBFGS(
training,
new GradientForLogRegADMM(rho,fromBreeze(x)),
new SimpleUpdater(),
numCorrections,
convergenceTol,
maxNumIterations,
regParam,
fromBreeze(x))
toBreeze(weights.toArray).toDenseVector
}
def apply(x: BDV[Double]): Double = {
Math.pow(1,2.0)
}
}
具有以下伴生对象:
object LogRegressionXUpdate {
def fromTextFile(file: RDD[String], rho: Double, blockHeight: Int = 1024, @transient sc: SparkContext): RDF[LogRegressionXUpdate] = {
val fns = new BlockMatrix(file, blockHeight).blocks.
map(X => new LogRegressionXUpdate(sc.parallelize((X(*,::).map(fila => (fila(-1),fromBreeze(fila(0 to -2))))).toArray),rho))
new RDF[LogRegressionXUpdate](fns, 0L)
}
}
虽然我并不是真的需要 SparkContext 在本地构建每个 RDD,但此构造函数导致了序列化错误。我已经搜索了这个问题的解决方案,但添加 @transient 并没有解决它。 然后,我的问题是:是否真的有可能构建这些 "second layer RDDs" 或我被迫执行 L-BFGS 算法的非分布式版本。 提前致谢!
错误日志:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:294)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:293)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.map(RDD.scala:293)
at admm.functions.LogRegressionXUpdate$.fromTextFile(LogRegressionXUpdate.scala:70)
at admm.examples.Lasso$.run(Lasso.scala:96)
at admm.examples.Lasso$$anonfun$main.apply(Lasso.scala:70)
at admm.examples.Lasso$$anonfun$main.apply(Lasso.scala:69)
at scala.Option.map(Option.scala:145)
at admm.examples.Lasso$.main(Lasso.scala:69)
at admm.examples.Lasso.main(Lasso.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@20576557)
- field (class: admm.functions.LogRegressionXUpdate$$anonfun, name: sc, type: class org.apache.spark.SparkContext)
- object (class admm.functions.LogRegressionXUpdate$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
... 21 more
RDD
s 只能从驱动程序访问。每当你调用
myRDD.map(someObject.someMethod)
spark 序列化计算 someMethod
所需的任何内容,并将其发送给工作人员。在那里,该方法被反序列化,然后在每个分区上独立运行。
但是,您尝试使用 本身 使用 spark 的方法:您尝试创建一个新的 RDD。但是,这是不可能的,因为它们只能在驱动程序中创建。您看到的错误是 spark 尝试序列化 spark 上下文本身,因为每个块的计算都需要它。有关序列化的更多信息,请参阅 this 问题的第一个答案。
“...虽然我并不是真的需要 SparkContext 在本地构建每个 RDD”——实际上这正是您在调用 sc.parallelize
时正在做的事情。底线 - 您需要找到(或编写)L-BFGS 的本地实现。