Spark:任务不可序列化 (Broadcast/RDD/SparkContext)

Spark: Task not serializable (Broadcast/RDD/SparkContext)

Spark 中有很多关于 Task is not serializable 的问题。不过这个case好像挺特殊的。

我创建了一个 class:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
    .persist()
  val sc = allEs.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  [...]

class定义了如下方法:

private def centroidDistances(v: Vector): Array[Double] = {
  centroids.value.map(c => (centroids.value.indexOf(c), Vectors.sqdist(v, c)))
    .sortBy(_._1)
    .map(_._2)
}

但是,当调用 class 时,抛出 Task is not serializable 异常。

很奇怪,class Neighbours 的 header 中的微小变化足以解决问题。我没有创建用于广播的 val sc: SparkContext,而是内联创建 Spark 上下文的代码:

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  val allEs: RDD[(String, E)] = e.map(e => (e.w, e))
  .setName("embeddings")
  .persist()
  val centroids = allEmbeddings.sparkContext(m.clusterCenters)
  [...]

我的问题是:第二个变体有何不同?第一个出了什么问题?从直觉上看,这应该只是语法糖,这是Spark中的一个错误吗?

我在 Hadoop/Yarn 集群上使用 Spark 1.4.1。

当你定义

class Neighbours(e: RDD[E], m: KMeansModel) extends Serializable {
  ...
  val sc = allEmbeddings.sparkContext
  val centroids = sc.broadcast(m.clusterCenters)
  ...
}

您已将 sc 设为 class 变量,这意味着它可以从 Neighbours 的实例访问,例如neighbours.sc。这意味着 sc 需要可序列化,但事实并非如此。

内联代码时,只有 centroids 的最终值需要序列化。 centroids 是可序列化的 Broadcast 类型。