比较spark中两个RDD中的数据
Compare data in two RDD in spark
我可以使用以下代码在两个 RDD 中打印数据。
usersRDD.foreach(println)
empRDD.foreach(println)
我需要比较两个 RDD 中的数据。如何迭代和比较一个 RDD 中的字段数据与另一个 RDD 中的字段数据。例如:迭代记录并检查 userRDD
中的姓名和年龄是否在 empRDD
中有匹配的记录,如果没有则放入单独的 RDD.
我尝试使用 userRDD.substract(empRDD)
,但它正在比较所有字段。
您需要在每个 RDD 中键入数据,以便有一些东西可以加入记录。看看 groupBy
例如。然后你 join
结果 RDD。对于每个键,您都可以获得两者的匹配值。如果您有兴趣查找不匹配的密钥,请使用 leftOuterJoin
,如下所示:
// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
userRDD.leftOuterJoin(empRDD).collect {
case (name, (age, None)) => name -> age
}
}
当然上面的解法是完整正确的!只有一个提议,当且仅当 RDD 是同步的(相同的行具有相同的键)。您可以使用分布式解决方案并通过以下经过测试的解决方案仅使用火花转换来利用并行性:
def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = {
val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)}
val rdd2 = rdd1.filter{case(k,v)=>(v!=0)}
var equal = true;
rdd2.map{
case(k,v)=> if(v!=0) equal = false
}
return equal
}
您可以在"join"中选择分区数。
我可以使用以下代码在两个 RDD 中打印数据。
usersRDD.foreach(println)
empRDD.foreach(println)
我需要比较两个 RDD 中的数据。如何迭代和比较一个 RDD 中的字段数据与另一个 RDD 中的字段数据。例如:迭代记录并检查 userRDD
中的姓名和年龄是否在 empRDD
中有匹配的记录,如果没有则放入单独的 RDD.
我尝试使用 userRDD.substract(empRDD)
,但它正在比较所有字段。
您需要在每个 RDD 中键入数据,以便有一些东西可以加入记录。看看 groupBy
例如。然后你 join
结果 RDD。对于每个键,您都可以获得两者的匹配值。如果您有兴趣查找不匹配的密钥,请使用 leftOuterJoin
,如下所示:
// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
userRDD.leftOuterJoin(empRDD).collect {
case (name, (age, None)) => name -> age
}
}
当然上面的解法是完整正确的!只有一个提议,当且仅当 RDD 是同步的(相同的行具有相同的键)。您可以使用分布式解决方案并通过以下经过测试的解决方案仅使用火花转换来利用并行性:
def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = {
val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)}
val rdd2 = rdd1.filter{case(k,v)=>(v!=0)}
var equal = true;
rdd2.map{
case(k,v)=> if(v!=0) equal = false
}
return equal
}
您可以在"join"中选择分区数。