Spark 广播大列表序列化失败

Spark broadcast big list fail to serialize

我有一个很大的字符串列表(140866 个元素)需要一些时间来计算。计算完成后,我想在 UDF 或我的 DataFrame 的地图中使用此列表。我按照一些教程找到了这个例子

  val states = List("NY","New York","CA","California","FL","Florida")
  val countries = Map(("USA","United States of America"),("IN","India"))

  val broadcastStates = spark.sparkContext.broadcast(states)
  val broadcastCountries = spark.sparkContext.broadcast(countries)

  val data = Seq(("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  )

  val columns = Seq("firstname","lastname","country","state")
  import spark.sqlContext.implicits._
  val df = data.toDF(columns:_*)

  val df2 = df.map(row=>{
    val country = row.getString(2)
    val state = row.getString(3)

    val fullCountry = broadcastCountries.value.get(country).get
    val fullState = broadcastStates.value(0)
    (row.getString(0),row.getString(1),fullCountry,fullState)
  }).toDF(columns:_*)

  df2.show(false)

效果很好。

但是当我尝试使用我的列表时出现此错误

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:630)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:283)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:375)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
  at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun$head.apply(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset$$anonfun.apply(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:753)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:730)
  ... 54 elided
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6ebc6ccc)

我用

得到了我的清单
val myList = spark.read.option("header",true)
            .csv(NER_PATH_S3)
            .na.drop()
            .filter(col("label") =!= "VALUE" )
            .groupBy("keyword")
            .agg(sum("n_occurences").alias("n_occurences"))
            .filter(col("n_occurences") > 2)
            .filter($"keyword".rlike("[^0-9]+"))
            .select("keyword")
            .collect()
            .map(x => x(0).toString)
            .toList

val myListBroadcast = sc.broadcast(myList)

我确保我的类型与我的示例完全相同,我还尝试通过切片来减小列表的大小。

根据我的说法而不是使用

sc.broadcast(myList)

你可以使用

spark.sparkContext.broadcast(myList)

这应该有效。

我遇到过类似的问题,当我将代码更改为我建议的代码时,它非常有效。 快乐学习。