Spark:在本地模式下广播使用
Spark: Broadcast usage on local mode
我知道广播允许在每台机器上缓存一个只读副本,而不是将它的副本与任务一起发送。
但是,我想知道广播在本地模式下使用时是否有任何巨大影响,因为我没有节点集群。
或者在本地模式下不广播就可以使用吗?我只是想了解它的用法。
Spark 版本 #2.0,Scala 版本 #2.10
本地模式 - 8 核 CPU 64GB RAM
我有如下内容:
case class EmpDim(name: String,age: Int)
empDF
+-----+-------+------+
|EmpId|EmpName|EmpAge|
+-----+-------+------+
| 1| John| 32|
| 2| David| 45|
+-----+-------+------+
deptDF
+------+--------+-----+
|DeptID|DeptName|EmpID|
+------+--------+-----+
| 1| Admin| 1|
| 2| HR| 2|
| 3| Finance| 4|
+------+--------+-----+
val empRDD = empDF.rdd.map(x => (x.getInt(0), EmpDim(x.getString(1), x.getInt(2))))
val lookupMap = empRDD.collectAsMap() //Without Broadcast
val broadCastLookupMap: Broadcast[Map[Int,EmpDim]] = sc.broadcast(empRDD.collectAsMap()) //With Broadcast
def lookup(lookupMap:Map[Int,EmpDim]) = udf[Option[EmpDim],Int]((empID:Int) => lookupMap.lift(empID))
val combinedDF = deptDF.withColumn("lookupEmp",lookup(lookupMap)($"EmpID")) //Without Broadcast
.withColumn("broadCastLookupEmp",lookup(broadCastLookupMap.value)($"EmpID")) //With Broadcast
.withColumn("EmpName",coalesce($"lookupEmp.name",lit("Unknown - No Name to Lookup")))
.withColumn("EmpAge",coalesce($"lookupEmp.age",lit("Unknown - No Age to Lookup")))
.drop("lookupEmp")
.drop("broadCastLookupEmp")
+------+--------+-----+---------------------------+--------------------------+
|DeptID|DeptName|EmpID|EmpName |EmpAge |
+------+--------+-----+---------------------------+--------------------------+
|1 |Admin |1 |John |32 |
|2 |HR |2 |David |45 |
|3 |Finance |4 |Unknown - No Name to Lookup|Unknown - No Age to Lookup|
+------+--------+-----+---------------------------+--------------------------+
在上述情况下,使用广播是否明智,还是有点矫枉过正?请指教
这样用的话广播就没有任何价值了
当你打电话时:
lookup(broadCastLookupMap.value)($"EmpID")
broadCastLookupMap.value
将根据 Scala 替换模型在本地进行评估。
正确的实施方式是:
def lookup(lookupMap: Broadcast[Map[Int, EmpDim]]) = udf[Option[EmpDim],Int](
(empID:Int) => lookupMap.value.lift(empID)
)
并调用:
lookup(broadCastLookupMap)($"EmpID")
这可能会产生一些积极的影响,具体取决于实际的执行计划。本地或非本地模式 - 适用相同的规则
- 如果数据在阶段之间重复使用(显式或隐式),广播可能很有用。
- 如果数据在管道中只使用一次,标准的闭包/参数处理机制就足够了。
这里没有任何迹象表明第一种情况,因此广播应该已过时,但如果您想确定,请使用实时环境测试这两种解决方案并比较结果。
按姓名呼叫也应该有效:
def lookup(lookupMap: => Map[Int,EmpDim]) = udf[Option[EmpDim],Int](
(empID:Int) => lookupMap.lift(empID)
)
我知道广播允许在每台机器上缓存一个只读副本,而不是将它的副本与任务一起发送。 但是,我想知道广播在本地模式下使用时是否有任何巨大影响,因为我没有节点集群。 或者在本地模式下不广播就可以使用吗?我只是想了解它的用法。
Spark 版本 #2.0,Scala 版本 #2.10 本地模式 - 8 核 CPU 64GB RAM
我有如下内容:
case class EmpDim(name: String,age: Int)
empDF
+-----+-------+------+
|EmpId|EmpName|EmpAge|
+-----+-------+------+
| 1| John| 32|
| 2| David| 45|
+-----+-------+------+
deptDF
+------+--------+-----+
|DeptID|DeptName|EmpID|
+------+--------+-----+
| 1| Admin| 1|
| 2| HR| 2|
| 3| Finance| 4|
+------+--------+-----+
val empRDD = empDF.rdd.map(x => (x.getInt(0), EmpDim(x.getString(1), x.getInt(2))))
val lookupMap = empRDD.collectAsMap() //Without Broadcast
val broadCastLookupMap: Broadcast[Map[Int,EmpDim]] = sc.broadcast(empRDD.collectAsMap()) //With Broadcast
def lookup(lookupMap:Map[Int,EmpDim]) = udf[Option[EmpDim],Int]((empID:Int) => lookupMap.lift(empID))
val combinedDF = deptDF.withColumn("lookupEmp",lookup(lookupMap)($"EmpID")) //Without Broadcast
.withColumn("broadCastLookupEmp",lookup(broadCastLookupMap.value)($"EmpID")) //With Broadcast
.withColumn("EmpName",coalesce($"lookupEmp.name",lit("Unknown - No Name to Lookup")))
.withColumn("EmpAge",coalesce($"lookupEmp.age",lit("Unknown - No Age to Lookup")))
.drop("lookupEmp")
.drop("broadCastLookupEmp")
+------+--------+-----+---------------------------+--------------------------+
|DeptID|DeptName|EmpID|EmpName |EmpAge |
+------+--------+-----+---------------------------+--------------------------+
|1 |Admin |1 |John |32 |
|2 |HR |2 |David |45 |
|3 |Finance |4 |Unknown - No Name to Lookup|Unknown - No Age to Lookup|
+------+--------+-----+---------------------------+--------------------------+
在上述情况下,使用广播是否明智,还是有点矫枉过正?请指教
这样用的话广播就没有任何价值了
当你打电话时:
lookup(broadCastLookupMap.value)($"EmpID")
broadCastLookupMap.value
将根据 Scala 替换模型在本地进行评估。
正确的实施方式是:
def lookup(lookupMap: Broadcast[Map[Int, EmpDim]]) = udf[Option[EmpDim],Int](
(empID:Int) => lookupMap.value.lift(empID)
)
并调用:
lookup(broadCastLookupMap)($"EmpID")
这可能会产生一些积极的影响,具体取决于实际的执行计划。本地或非本地模式 - 适用相同的规则
- 如果数据在阶段之间重复使用(显式或隐式),广播可能很有用。
- 如果数据在管道中只使用一次,标准的闭包/参数处理机制就足够了。
这里没有任何迹象表明第一种情况,因此广播应该已过时,但如果您想确定,请使用实时环境测试这两种解决方案并比较结果。
按姓名呼叫也应该有效:
def lookup(lookupMap: => Map[Int,EmpDim]) = udf[Option[EmpDim],Int](
(empID:Int) => lookupMap.lift(empID)
)