Databrick Azure 广播变量不可序列化
Databrick Azure broadcast variables not serializable
所以我正在尝试使用 Azure Databricks 创建一个非常简单的 spark notebook,并且想使用一个简单的 RDD 映射调用。
这只是为了搞乱,所以这个例子有点做作,但我无法在 RDD 映射调用中获得一个值,除非它是一个静态常量值
我试过使用广播变量
这是一个使用 int 的简单示例,我广播了它,然后尝试在 RDD 映射中使用它
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
这是另一个例子,我使用简单的可序列化单例对象和一个我广播的 int 字段,然后尝试在 RDD 映射中使用
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()
最后是一个 List[int]
我广播了一个元素,然后尝试在 RDD 映射中使用
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()
但是上面的所有示例都因此错误而失败。如您所见,这指出了 RDD 映射值不可序列化的问题。我看不出这个问题,并且 int 值应该可以使用我认为的所有上述示例进行序列化
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:378)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.map(RDD.scala:378)
如果我将 RDD 映射中的值设置为像这样的常规 int 值
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()
一切正常,我看到我的简单 DataFrame 按预期显示
有什么想法吗?
根据您的代码,我假设您使用的是 Spark 2+。也许,没有必要下降到 RDD 级别,而是使用 DataFrames。
下面的代码展示了如何加入两个 DataFrame 并显式广播第一个。
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val data = Seq(1, 2, 3, 4, 5)
val dataDF = data.toDF("id")
val largeDataDF = Seq((0, "Apple"), (1, "Pear"), (2, "Banana")).toDF("id", "value")
val df = largeDataDF.join(broadcast(dataDF), Seq("id"))
df.show()
通常,小型 DataFrame 是广播的完美候选者,作为一种优化,它们被发送到所有执行者。 spark.sql.autoBroadcastJoinThreshold 是一种限制符合广播条件的数据帧大小的配置。可以在 Spark official documentation
上找到更多详细信息
另请注意,使用 DataFrames,您可以使用方便的 explain 方法。有了它,您可以看到物理计划,它可以用于调试。
我们示例中的 运行 explain() 将确认 Spark 正在执行 BroadcastHashJoin 优化。
df.explain()
== Physical Plan ==
*Project [id#11, value#12]
+- *BroadcastHashJoin [id#11], [id#3], Inner, BuildRight
:- LocalTableScan [id#11, value#12]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#3]
如果您需要有关 DataFrames 的更多帮助,我在 http://allaboutscala.com/big-data/spark/
提供了广泛的示例列表
所以答案是您不应该在 val 中捕获 Spark 内容,然后将其用于广播。所以这是工作代码
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = spark.sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
感谢@nadim Bahadoor 的回答
所以我正在尝试使用 Azure Databricks 创建一个非常简单的 spark notebook,并且想使用一个简单的 RDD 映射调用。
这只是为了搞乱,所以这个例子有点做作,但我无法在 RDD 映射调用中获得一个值,除非它是一个静态常量值
我试过使用广播变量
这是一个使用 int 的简单示例,我广播了它,然后尝试在 RDD 映射中使用它
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
这是另一个例子,我使用简单的可序列化单例对象和一个我广播的 int 字段,然后尝试在 RDD 映射中使用
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
object Foo extends Serializable { val theMultiplier: Int = multiplier}
val fooBroadcast = sparkContext.broadcast(Foo)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => fooBroadcast.value.theMultiplier)
val df = mappedRdd.toDF
df.show()
最后是一个 List[int]
我广播了一个元素,然后尝试在 RDD 映射中使用
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val multiplier = 3
val listBroadcast = sparkContext.broadcast(List(multiplier))
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => listBroadcast.value.head)
val df = mappedRdd.toDF
df.show()
但是上面的所有示例都因此错误而失败。如您所见,这指出了 RDD 映射值不可序列化的问题。我看不出这个问题,并且 int 值应该可以使用我认为的所有上述示例进行序列化
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2375)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:379)
at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:378)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.map(RDD.scala:378)
如果我将 RDD 映射中的值设置为像这样的常规 int 值
val sparkContext = spark.sparkContext
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => 6)
val df = mappedRdd.toDF
df.show()
一切正常,我看到我的简单 DataFrame 按预期显示
有什么想法吗?
根据您的代码,我假设您使用的是 Spark 2+。也许,没有必要下降到 RDD 级别,而是使用 DataFrames。
下面的代码展示了如何加入两个 DataFrame 并显式广播第一个。
import sparkSession.implicits._
import org.apache.spark.sql.functions._
val data = Seq(1, 2, 3, 4, 5)
val dataDF = data.toDF("id")
val largeDataDF = Seq((0, "Apple"), (1, "Pear"), (2, "Banana")).toDF("id", "value")
val df = largeDataDF.join(broadcast(dataDF), Seq("id"))
df.show()
通常,小型 DataFrame 是广播的完美候选者,作为一种优化,它们被发送到所有执行者。 spark.sql.autoBroadcastJoinThreshold 是一种限制符合广播条件的数据帧大小的配置。可以在 Spark official documentation
上找到更多详细信息另请注意,使用 DataFrames,您可以使用方便的 explain 方法。有了它,您可以看到物理计划,它可以用于调试。
我们示例中的运行 explain() 将确认 Spark 正在执行 BroadcastHashJoin 优化。
df.explain()
== Physical Plan ==
*Project [id#11, value#12]
+- *BroadcastHashJoin [id#11], [id#3], Inner, BuildRight
:- LocalTableScan [id#11, value#12]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- LocalTableScan [id#3]
如果您需要有关 DataFrames 的更多帮助,我在 http://allaboutscala.com/big-data/spark/
提供了广泛的示例列表所以答案是您不应该在 val 中捕获 Spark 内容,然后将其用于广播。所以这是工作代码
import sqlContext.implicits._
val multiplier = 3
val multiplierBroadcast = spark.sparkContext.broadcast(multiplier)
val data = Array(1, 2, 3, 4, 5)
val dataRdd = sparkContext.parallelize(data)
val mappedRdd = dataRdd.map(x => multiplierBroadcast.value)
val df = mappedRdd.toDF
df.show()
感谢@nadim Bahadoor 的回答