如何从 Spark 中的 RDD 和数据帧中过滤?
How to filter from RDDs and DataFrames in Spark?
我有一个 .tsv
文件 pageviews_by_second
由 timestamp
site
和 requests
字段组成:
"timestamp" "site" "requests"
"2015-03-16T00:09:55" "mobile" 1595
"2015-03-16T00:10:39" "mobile" 1544
"2015-03-16T00:19:39" "desktop" 2460
我希望第一行消失,因为它会导致我必须对数据执行的操作出错。
我尝试过以下方法:
1.Filtering 拆分前的 RDD
val RDD1 = sc.textFile("pageviews_by_second")
val top_row = RDD1.first()
//returns: top_row: String = "timestamp" "site" "requests"
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first()
//returns: "2015-03-16T00:09:55" "mobile" 1595
2.Filtering拆分后的RDD
val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first() //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
3.Converting 使用 'case class' 并过滤它
到 DataFrame
case class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)
为什么只有第一种方法能够过滤掉第一行,而其他两种方法不能?方法三为什么会报错,如何解决?
您首先需要了解在删除第一行时比较的数据类型。
比较两个字符串将在方法 1 中得出 true 或 false。因此它会过滤掉第一行
在方法 2 中,您正在比较数组的 2 Arrays.Use deep
方法以对 scala 中的数组进行更深入的比较
Method2
val RDD1 = sc.textFile("D:\trial.txt").map(_.split("\t"))
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
RDD2.first().foreach(println(_))
在方法 3 中,您正在比较数据框的两行对象。如果将行转换为 toSeq
后跟 toArray
然后使用 deep
方法过滤掉数据帧的第一行会更好。
//Method 3
DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)
如果有帮助,请回复。谢谢!!!
首先,您确实应该使用 spark-csv
-package - 它可以在创建 DataFrame
(或 rdd
)时自动过滤掉 header。您只需指定 :)
其次,rdds
的排序方式与您想象的不同。调用 first
不能保证 return 您的 csv-file 的第一行。这是您的第一个场景,显然您确实获得了第一排,但如果您认为自己在这种情况下很幸运,那会更好。此外,从一个可能非常大的数据集中删除这样的 header 是非常低效的,因为 Spark 需要搜索所有行以过滤掉一行。
如果排序对您的进一步计算很重要,您可以随时执行 zipWithIndex
。这样,您就可以对 rdd
进行排序以保留顺序。
有办法移除header,而不是深度比较:
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(lambda x:x !=header) #filter out header
我发现了另一种方法,它比我使用的过滤方法更有效。将其作为答案发布,因为其他人可能会发现它有帮助:
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
来源:
我有一个 .tsv
文件 pageviews_by_second
由 timestamp
site
和 requests
字段组成:
"timestamp" "site" "requests"
"2015-03-16T00:09:55" "mobile" 1595
"2015-03-16T00:10:39" "mobile" 1544
"2015-03-16T00:19:39" "desktop" 2460
我希望第一行消失,因为它会导致我必须对数据执行的操作出错。
我尝试过以下方法:
1.Filtering 拆分前的 RDD
val RDD1 = sc.textFile("pageviews_by_second")
val top_row = RDD1.first()
//returns: top_row: String = "timestamp" "site" "requests"
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first()
//returns: "2015-03-16T00:09:55" "mobile" 1595
2.Filtering拆分后的RDD
val RDD1 = sc.textFile("pageviews_by_second").map(_.split("\t")
RDD1.first() //returns res0: Array[String] = Array("timestamp, 'site", "requests")
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x!= top_row)
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
val RDD2 = RDD1.filter(x => x(0)!="timestamp" && x(1)!="site" && x(2)!="requests")
RDD2.first() //returns: res1: Array[String] = Array("timestamp", "site" ,"requests")
3.Converting 使用 'case class' 并过滤它
到 DataFramecase class Wiki(timestamp: String, site: String, requests: String)
val DF = sc.textFile("pageviews_by_second").map(_.split("\t")).map(w => Wiki(w(0), w(1), w(2))).toDF()
val top_row = DF.first()
//returns: top_row: org.apache.spark.sql.Row = ["timestamp","site","requests"]
DF.filter(_ => _ != top_row)
//returns: error: missing parameter type
val DF2 = DF.filter(_ => _ != top_row2)
为什么只有第一种方法能够过滤掉第一行,而其他两种方法不能?方法三为什么会报错,如何解决?
您首先需要了解在删除第一行时比较的数据类型。
比较两个字符串将在方法 1 中得出 true 或 false。因此它会过滤掉第一行
在方法 2 中,您正在比较数组的 2 Arrays.Use deep
方法以对 scala 中的数组进行更深入的比较
Method2
val RDD1 = sc.textFile("D:\trial.txt").map(_.split("\t"))
val top_row = RDD1.first()
val RDD2 = RDD1.filter(x => x.deep!= top_row.deep)
RDD2.first().foreach(println(_))
在方法 3 中,您正在比较数据框的两行对象。如果将行转换为 toSeq
后跟 toArray
然后使用 deep
方法过滤掉数据帧的第一行会更好。
//Method 3
DF.filter(_ => _.toSeq.toArray.deep!=top_row.toSeq.toArray.deep)
如果有帮助,请回复。谢谢!!!
首先,您确实应该使用 spark-csv
-package - 它可以在创建 DataFrame
(或 rdd
)时自动过滤掉 header。您只需指定 :)
其次,rdds
的排序方式与您想象的不同。调用 first
不能保证 return 您的 csv-file 的第一行。这是您的第一个场景,显然您确实获得了第一排,但如果您认为自己在这种情况下很幸运,那会更好。此外,从一个可能非常大的数据集中删除这样的 header 是非常低效的,因为 Spark 需要搜索所有行以过滤掉一行。
如果排序对您的进一步计算很重要,您可以随时执行 zipWithIndex
。这样,您就可以对 rdd
进行排序以保留顺序。
有办法移除header,而不是深度比较:
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(lambda x:x !=header) #filter out header
我发现了另一种方法,它比我使用的过滤方法更有效。将其作为答案发布,因为其他人可能会发现它有帮助:
rdd.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }
来源: