Spark:任务不可序列化 (Broadcast/RDD/SparkContext)
Spark: Task not serializable (Broadcast/RDD/SparkContext)
Spark 中有很多关于 Task is not serializable
的问题。不过这个case好像挺特殊的。
我创建了一个 class:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.persist()
val sc = allEs.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
[...]
class定义了如下方法:
private def centroidDistances(v: Vector): Array[Double] = {
centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
.sortBy(_._1)
.map(_._2)
}
但是,当调用 class 时,抛出 Task is not serializable
异常。
很奇怪,class Neighbours
的 header 中的微小变化足以解决问题。我没有创建用于广播的 val sc: SparkContext
,而是内联创建 Spark 上下文的代码:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.setName("embeddings")
.persist()
val centroids = allEmbeddings.sparkContext(m.clusterCenters)
[...]
我的问题是:第二个变体有何不同?第一个出了什么问题?从直觉上看,这应该只是语法糖,这是Spark中的一个错误吗?
我在 Hadoop/Yarn 集群上使用 Spark 1.4.1。
当你定义
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
...
val sc = allEmbeddings.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
...
}
您已将 sc
设为 class 变量,这意味着它可以从 Neighbours
的实例访问,例如neighbours.sc
。这意味着 sc
需要可序列化,但事实并非如此。
内联代码时,只有 centroids
的最终值需要序列化。 centroids
是可序列化的 Broadcast
类型。
Spark 中有很多关于 Task is not serializable
的问题。不过这个case好像挺特殊的。
我创建了一个 class:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.persist()
val sc = allEs.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
[...]
class定义了如下方法:
private def centroidDistances(v: Vector): Array[Double] = {
centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
.sortBy(_._1)
.map(_._2)
}
但是,当调用 class 时,抛出 Task is not serializable
异常。
很奇怪,class Neighbours
的 header 中的微小变化足以解决问题。我没有创建用于广播的 val sc: SparkContext
,而是内联创建 Spark 上下文的代码:
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
.setName("embeddings")
.persist()
val centroids = allEmbeddings.sparkContext(m.clusterCenters)
[...]
我的问题是:第二个变体有何不同?第一个出了什么问题?从直觉上看,这应该只是语法糖,这是Spark中的一个错误吗?
我在 Hadoop/Yarn 集群上使用 Spark 1.4.1。
当你定义
class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
...
val sc = allEmbeddings.sparkContext
val centroids = sc.broadcast(m.clusterCenters)
...
}
您已将 sc
设为 class 变量,这意味着它可以从 Neighbours
的实例访问,例如neighbours.sc
。这意味着 sc
需要可序列化,但事实并非如此。
内联代码时,只有 centroids
的最终值需要序列化。 centroids
是可序列化的 Broadcast
类型。