在 Spark 中对 RDD 进行排序
Sorting an RDD in Spark
我有一个列出客户购买的一般物品的数据集。 csv 中的每条记录从左到右列出了客户购买的商品。例如(缩短的样本):
Bicycle, Helmet, Gloves
Shoes, Jumper, Gloves
Television, Hat, Jumper, Playstation 5
我希望将其放入 Scala 的 RDD 中,并对它们进行计数。
case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();
以上是一个简短的代码示例。第一行是大小写class(还没用)。
第二行从 csv 中获取数据并将其放入 rdd_1 中。很容易。
第三行做 flatmap,用逗号拆分数据,然后对每个进行计数。因此,例如,上面的“手套”和“跳线”旁边会有数字 2。其他 1. 在看起来像元组集合的地方。
到目前为止一切顺利。
接下来,我想排序 rdd_2 以列出购买最多的前 3 件商品。
我可以用 RDD 做这个吗?还是需要把RDD转成dataframe来实现排序?
如果可以,我该怎么做?
如何将第 1 行中的 class 应用于 rdd_2,这似乎是一个元组列表?我应该采用这种方法吗?
提前致谢
案例 class 中的计数应该是一个整数...如果您想将结果保留为 RDD,我建议使用 reduceByKey
而不是 countByValue
其中 returns 一个 Map[String, Long]
而不是一个 RDD。
此外,我建议按 ,
而不是 ,
进行拆分,以避免项目名称中出现前导空格。
case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(_.split(", "))
.map((_, 1))
.reduceByKey(_ + _)
.map(line => SalesItemSummary(line._1, line._2))
rdd_2.collect()
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))
对RDD进行排序,可以使用sortBy
:
val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
top3
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))
我有一个列出客户购买的一般物品的数据集。 csv 中的每条记录从左到右列出了客户购买的商品。例如(缩短的样本):
Bicycle, Helmet, Gloves
Shoes, Jumper, Gloves
Television, Hat, Jumper, Playstation 5
我希望将其放入 Scala 的 RDD 中,并对它们进行计数。
case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: String)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(line => line.split(",")).countByValue();
以上是一个简短的代码示例。第一行是大小写class(还没用)。 第二行从 csv 中获取数据并将其放入 rdd_1 中。很容易。 第三行做 flatmap,用逗号拆分数据,然后对每个进行计数。因此,例如,上面的“手套”和“跳线”旁边会有数字 2。其他 1. 在看起来像元组集合的地方。 到目前为止一切顺利。
接下来,我想排序 rdd_2 以列出购买最多的前 3 件商品。 我可以用 RDD 做这个吗?还是需要把RDD转成dataframe来实现排序? 如果可以,我该怎么做?
如何将第 1 行中的 class 应用于 rdd_2,这似乎是一个元组列表?我应该采用这种方法吗?
提前致谢
案例 class 中的计数应该是一个整数...如果您想将结果保留为 RDD,我建议使用 reduceByKey
而不是 countByValue
其中 returns 一个 Map[String, Long]
而不是一个 RDD。
此外,我建议按 ,
而不是 ,
进行拆分,以避免项目名称中出现前导空格。
case class SalesItemSummary(SalesItemDesc: String, SalesItemCount: Int)
val rdd_1 = sc.textFile("Data/SalesItems.csv")
val rdd_2 = rdd_1.flatMap(_.split(", "))
.map((_, 1))
.reduceByKey(_ + _)
.map(line => SalesItemSummary(line._1, line._2))
rdd_2.collect()
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Shoes,1), SalesItemSummary(Television,1), SalesItemSummary(Bicycle,1), SalesItemSummary(Helmet,1), SalesItemSummary(Hat,1), SalesItemSummary(Jumper,2), SalesItemSummary(Playstation 5,1))
对RDD进行排序,可以使用sortBy
:
val top3 = rdd_2.sortBy(_.SalesItemCount, false).take(3)
top3
// Array[SalesItemSummary] = Array(SalesItemSummary(Gloves,2), SalesItemSummary(Jumper,2), SalesItemSummary(Shoes,1))