Databrick Azure 广播变量不可序列化

Databrick Azure broadcast variables not serializable

所以我正在尝试使用 Azure Databricks 创建一个非常简单的 spark notebook,并且想使用一个简单的 RDD 映射调用。

这只是为了搞乱,所以这个例子有点做作,但我无法在 RDD 映射调用中获得一个值,除非它是一个静态常量值

我试过使用广播变量

这是一个使用 int 的简单示例,我广播了它,然后尝试在 RDD 映射中使用它

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

这是另一个例子,我使用简单的可序列化单例对象和一个我广播的 int 字段,然后尝试在 RDD 映射中使用

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()

最后是一个 List[int] 我广播了一个元素,然后尝试在 RDD 映射中使用

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()

但是上面的所有示例都因此错误而失败。如您所见,这指出了 RDD 映射值不可序列化的问题。我看不出这个问题,并且 int 值应该可以使用我认为的所有上述示例进行序列化

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
    at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:379)
    at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:378)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
    at org.apache.spark.rdd.RDD.map(RDD.scala:378)

如果我将 RDD 映射中的值设置为像这样的常规 int 值

val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext

import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()

一切正常,我看到我的简单 DataFrame 按预期显示

有什么想法吗?

根据您的代码,我假设您使用的是 Spark 2+。也许,没有必要下降到 RDD 级别,而是使用 DataFrames。

下面的代码展示了如何加入两个 DataFrame 并显式广播第一个。

import sparkSession.implicits._
import org.apache.spark.sql.functions._

val data = Seq(1, 2, 3, 4, 5)
val dataDF = data.toDF("id")

val largeDataDF = Seq((0, "Apple"), (1, "Pear"), (2, "Banana")).toDF("id", "value")
val df = largeDataDF.join(broadcast(dataDF), Seq("id"))

df.show()

通常,小型 DataFrame 是广播的完美候选者,作为一种优化,它们被发送到所有执行者。 spark.sql.autoBroadcastJoinThreshold 是一种限制符合广播条件的数据帧大小的配置。可以在 Spark official documentation

上找到更多详细信息

另请注意,使用 DataFrames,您可以使用方便的 explain 方法。有了它,您可以看到物理计划,它可以用于调试。

我们示例中的

运行 explain() 将确认 Spark 正在执行 BroadcastHashJoin 优化。

df.explain()

== Physical Plan ==
*Project [id#11, value#12]
+- *BroadcastHashJoin [id#11], [id#3], Inner, BuildRight
:- LocalTableScan [id#11, value#12]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#3]  

如果您需要有关 DataFrames 的更多帮助,我在 http://allaboutscala.com/big-data/spark/

提供了广泛的示例列表

所以答案是您不应该在 val 中捕获 Spark 内容,然后将其用于广播。所以这是工作代码

import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = spark.sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()

感谢@nadim Bahadoor 的回答