如何从 Spark 中的 RDD 和数据帧中过滤?

How to filter from RDDs and DataFrames in Spark?

我有一个 .tsv 文件 pageviews_by_secondtimestamp siterequests 字段组成:

"timestamp"              "site"   "requests"
"2015-03-16T00:09:55"   "mobile"    1595
"2015-03-16T00:10:39"   "mobile"    1544
"2015-03-16T00:19:39"   "desktop"   2460

我希望第一行消失,因为它会导致我必须对数据执行的操作出错。

我尝试过以下方法:

1.Filtering 拆分前的 RDD

val RDD1 = sc.textFile("pageviews_by_second")       
val top_row = RDD1.first() 
//returns: top_row: String = "timestamp"    "site"  "requests"    
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() 
//returns: "2015-03-16T00:09:55"    "mobile"    1595

2.Filtering拆分后的RDD

val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first()  //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
 RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")

3.Converting 使用 'case class' 并过滤它

到 DataFrame
case class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)

为什么只有第一种方法能够过滤掉第一行,而其他两种方法不能?方法三为什么会报错,如何解决?

您首先需要了解在删除第一行时比较的数据类型。

比较两个字符串将在方法 1 中得出 true 或 false。因此它会过滤掉第一行

在方法 2 中,您正在比较数组的 2 Arrays.Use deep 方法以对 scala 中的数组进行更深入的比较

Method2
val RDD1 = sc.textFile("D:\trial.txt").map(_.split("\t"))
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
RDD2.first().foreach(println(_))

在方法 3 中,您正在比较数据框的两行对象。如果将行转换为 toSeq 后跟 toArray 然后使用 deep 方法过滤掉数据帧的第一行会更好。

//Method 3    
DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)

如果有帮助,请回复。谢谢!!!

首先,您确实应该使用 spark-csv-package - 它可以在创建 DataFrame(或 rdd)时自动过滤掉 header。您只需指定 :)

其次,rdds 的排序方式与您想象的不同。调用 first 不能保证 return 您的 csv-file 的第一行。这是您的第一个场景,显然您确实获得了第一排,但如果您认为自己在这种情况下很幸运,那会更好。此外,从一个可能非常大的数据集中删除这样的 header 是非常低效的,因为 Spark 需要搜索所有行以过滤掉一行。

如果排序对您的进一步计算很重要,您可以随时执行 zipWithIndex。这样,您就可以对 rdd 进行排序以保留顺序。

有办法移除header,而不是深度比较:

data = sc.textFile('path_to_data') header = data.first() #extract header data = data.filter(lambda x:x !=header) #filter out header

可能与您相关: How do I skip a header from CSV files in Spark?

我发现了另一种方法,它比我使用的过滤方法更有效。将其作为答案发布,因为其他人可能会发现它有帮助:

rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }

来源: