在 Spark 数据帧中查找
Lookup in Spark dataframes
我正在使用 Spark 1.6,我想知道如何在数据帧中实现查找。
我有两个数据框员工和部门。
员工数据框
-------------------
Emp Id | Emp Name
------------------
1 | john
2 | David
部门数据框
--------------------
Dept Id | Dept Name | Emp Id
-----------------------------
1 | Admin | 1
2 | HR | 2
我想查找从员工 table 到部门 table 的 emp id 并获取部门名称。所以,结果集将是
Emp Id | Dept Name
-------------------
1 | Admin
2 | HR
如何在 SPARK 中实现此查找 UDF 功能。我不想在两个数据帧上都使用 JOIN。
正如您所说,您已经拥有 Dataframes,那么按照以下步骤操作非常简单:
1)创建一个sqlcontext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2) 为所有 3 个创建临时表,例如:
EmployeeDataframe.createOrReplaceTempView("EmpTable")
3) 使用 MySQL 查询
进行查询
val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " +
"E.EmpID=g.EmpID")
正如评论中已经提到的那样,加入数据框是可行的方法。
您可以使用查找,但我认为没有 "distributed" 解决方案,即您必须将查找-table 收集到驱动程序内存中。另请注意,此方法假定 EmpID 是唯一的:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
我最初的想法是将 empRdd
传递给 UDF 并使用 PairRDD
上定义的 lookup
方法,但这当然行不通,因为你不能有 spark 动作(即 lookup
)在转换(即 UDF)中。
编辑:
如果您的 empDf 有多个列(例如姓名、年龄),您可以使用此
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
从一些 "lookup" 数据开始,有两种方法:
方法 #1 -- 使用查找 DataFrame
// use a DataFrame (via a join)
val lookupDF = sc.parallelize(Seq(
("banana", "yellow"),
("apple", "red"),
("grape", "purple"),
("blueberry","blue")
)).toDF("SomeKeys","SomeValues")
方法 #2 -- 在 UDF 中使用映射
// turn the above DataFrame into a map which a UDF uses
val Keys = lookupDF.select("SomeKeys").collect().map(_(0).toString).toList
val Values = lookupDF.select("SomeValues").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap
def ThingToColor(key: String): String = {
if (key == null) return ""
val firstword = key.split(" ")(0) // fragile!
val result: String = KeyValueMap.getOrElse(firstword,"not found!")
return (result)
}
val ThingToColorUDF = udf( ThingToColor(_: String): String )
获取将要查找的事物的示例数据框:
val thingsDF = sc.parallelize(Seq(
("blueberry muffin"),
("grape nuts"),
("apple pie"),
("rutabaga pudding")
)).toDF("SomeThings")
方法#1是加入查找DataFrame
在这里,rlike 正在做匹配。 null 出现在不起作用的地方。查找 DataFrame 的两列都被添加。
val result_1_DF = thingsDF.join(lookupDF, expr("SomeThings rlike SomeKeys"),
"left_outer")
方法#2是使用UDF
添加一列
这里只添加了1列。并且 UDF 可以 return 一个非 Null 值。但是,如果查找数据非常大,可能无法 "serialize" 按要求发送给集群中的工作人员。
val result_2_DF = thingsDF.withColumn("AddValues",ThingToColorUDF($"SomeThings"))
这给你:
在我的例子中,我有一些超过 100 万个值的查找数据,所以方法 #1 是我唯一的选择。
我正在使用 Spark 1.6,我想知道如何在数据帧中实现查找。
我有两个数据框员工和部门。
员工数据框
------------------- Emp Id | Emp Name ------------------ 1 | john 2 | David
部门数据框
-------------------- Dept Id | Dept Name | Emp Id ----------------------------- 1 | Admin | 1 2 | HR | 2
我想查找从员工 table 到部门 table 的 emp id 并获取部门名称。所以,结果集将是
Emp Id | Dept Name
-------------------
1 | Admin
2 | HR
如何在 SPARK 中实现此查找 UDF 功能。我不想在两个数据帧上都使用 JOIN。
正如您所说,您已经拥有 Dataframes,那么按照以下步骤操作非常简单:
1)创建一个sqlcontext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2) 为所有 3 个创建临时表,例如:
EmployeeDataframe.createOrReplaceTempView("EmpTable")
3) 使用 MySQL 查询
进行查询val MatchingDetails = sqlContext.sql("SELECT DISTINCT E.EmpID, DeptName FROM EmpTable E inner join DeptTable G on " +
"E.EmpID=g.EmpID")
正如评论中已经提到的那样,加入数据框是可行的方法。
您可以使用查找,但我认为没有 "distributed" 解决方案,即您必须将查找-table 收集到驱动程序内存中。另请注意,此方法假定 EmpID 是唯一的:
import org.apache.spark.sql.functions._
import sqlContext.implicits._
import scala.collection.Map
val emp = Seq((1,"John"),(2,"David"))
val deps = Seq((1,"Admin",1),(2,"HR",2))
val empRdd = sc.parallelize(emp)
val depsDF = sc.parallelize(deps).toDF("DepID","Name","EmpID")
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,String]) = udf((empID:Int) => lookupMap.get(empID))
val combinedDF = depsDF
.withColumn("empNames",lookup(lookupMap)($"EmpID"))
我最初的想法是将 empRdd
传递给 UDF 并使用 PairRDD
上定义的 lookup
方法,但这当然行不通,因为你不能有 spark 动作(即 lookup
)在转换(即 UDF)中。
编辑:
如果您的 empDf 有多个列(例如姓名、年龄),您可以使用此
val empRdd = empDf.rdd.map{row =>
(row.getInt(0),(row.getString(1),row.getInt(2)))}
val lookupMap = empRdd.collectAsMap()
def lookup(lookupMap:Map[Int,(String,Int)]) =
udf((empID:Int) => lookupMap.lift(empID))
depsDF
.withColumn("lookup",lookup(lookupMap)($"EmpID"))
.withColumn("empName",$"lookup._1")
.withColumn("empAge",$"lookup._2")
.drop($"lookup")
.show()
从一些 "lookup" 数据开始,有两种方法:
方法 #1 -- 使用查找 DataFrame
// use a DataFrame (via a join)
val lookupDF = sc.parallelize(Seq(
("banana", "yellow"),
("apple", "red"),
("grape", "purple"),
("blueberry","blue")
)).toDF("SomeKeys","SomeValues")
方法 #2 -- 在 UDF 中使用映射
// turn the above DataFrame into a map which a UDF uses
val Keys = lookupDF.select("SomeKeys").collect().map(_(0).toString).toList
val Values = lookupDF.select("SomeValues").collect().map(_(0).toString).toList
val KeyValueMap = Keys.zip(Values).toMap
def ThingToColor(key: String): String = {
if (key == null) return ""
val firstword = key.split(" ")(0) // fragile!
val result: String = KeyValueMap.getOrElse(firstword,"not found!")
return (result)
}
val ThingToColorUDF = udf( ThingToColor(_: String): String )
获取将要查找的事物的示例数据框:
val thingsDF = sc.parallelize(Seq(
("blueberry muffin"),
("grape nuts"),
("apple pie"),
("rutabaga pudding")
)).toDF("SomeThings")
方法#1是加入查找DataFrame
在这里,rlike 正在做匹配。 null 出现在不起作用的地方。查找 DataFrame 的两列都被添加。
val result_1_DF = thingsDF.join(lookupDF, expr("SomeThings rlike SomeKeys"),
"left_outer")
方法#2是使用UDF
添加一列这里只添加了1列。并且 UDF 可以 return 一个非 Null 值。但是,如果查找数据非常大,可能无法 "serialize" 按要求发送给集群中的工作人员。
val result_2_DF = thingsDF.withColumn("AddValues",ThingToColorUDF($"SomeThings"))
这给你:
在我的例子中,我有一些超过 100 万个值的查找数据,所以方法 #1 是我唯一的选择。