Spark:在本地模式下广播使用

Spark: Broadcast usage on local mode

我知道广播允许在每台机器上缓存一个只读副本,而不是将它的副本与任务一起发送。 但是,我想知道广播在本地模式下使用时是否有任何巨大影响,因为我没有节点集群。 或者在本地模式下不广播就可以使用吗?我只是想了解它的用法。

Spark 版本 #2.0,Scala 版本 #2.10 本地模式 - 8 核 CPU 64GB RAM

我有如下内容:

case class EmpDim(name: String,age: Int)

empDF
+-----+-------+------+
|EmpId|EmpName|EmpAge|
+-----+-------+------+
|    1|   John|    32|
|    2|  David|    45|
+-----+-------+------+

deptDF
+------+--------+-----+
|DeptID|DeptName|EmpID|
+------+--------+-----+
|     1|   Admin|    1|
|     2|      HR|    2|
|     3| Finance|    4|
+------+--------+-----+

val empRDD = empDF.rdd.map(x => (x.getInt(0), EmpDim(x.getString(1), x.getInt(2))))

val lookupMap = empRDD.collectAsMap() //Without Broadcast
val broadCastLookupMap: Broadcast[Map[Int,EmpDim]] = sc.broadcast(empRDD.collectAsMap()) //With Broadcast

def lookup(lookupMap:Map[Int,EmpDim]) = udf[Option[EmpDim],Int]((empID:Int) => lookupMap.lift(empID))

val combinedDF = deptDF.withColumn("lookupEmp",lookup(lookupMap)($"EmpID")) //Without Broadcast
                       .withColumn("broadCastLookupEmp",lookup(broadCastLookupMap.value)($"EmpID")) //With Broadcast
                       .withColumn("EmpName",coalesce($"lookupEmp.name",lit("Unknown - No Name to Lookup")))
                       .withColumn("EmpAge",coalesce($"lookupEmp.age",lit("Unknown - No Age to Lookup")))
                       .drop("lookupEmp")
                       .drop("broadCastLookupEmp")

+------+--------+-----+---------------------------+--------------------------+
|DeptID|DeptName|EmpID|EmpName                    |EmpAge                    |
+------+--------+-----+---------------------------+--------------------------+
|1     |Admin   |1    |John                       |32                        |
|2     |HR      |2    |David                      |45                        |
|3     |Finance |4    |Unknown - No Name to Lookup|Unknown - No Age to Lookup|
+------+--------+-----+---------------------------+--------------------------+

在上述情况下,使用广播是否明智,还是有点矫枉过正?请指教

这样用的话广播就没有任何价值了

当你打电话时:

lookup(broadCastLookupMap.value)($"EmpID")

broadCastLookupMap.value 将根据 Scala 替换模型在本地进行评估。

正确的实施方式是:

def lookup(lookupMap: Broadcast[Map[Int, EmpDim]]) = udf[Option[EmpDim],Int](
  (empID:Int) => lookupMap.value.lift(empID)
)

并调用:

lookup(broadCastLookupMap)($"EmpID")

这可能会产生一些积极的影响,具体取决于实际的执行计划。本地或非本地模式 - 适用相同的规则

  • 如果数据在阶段之间重复使用(显式或隐式),广播可能很有用。
  • 如果数据在管道中只使用一次,标准的闭包/参数处理机制就足够了。

这里没有任何迹象表明第一种情况,因此广播应该已过时,但如果您想确定,请使用实时环境测试这两种解决方案并比较结果。

按姓名呼叫也应该有效:

def lookup(lookupMap: => Map[Int,EmpDim]) = udf[Option[EmpDim],Int](
  (empID:Int) => lookupMap.lift(empID)
)