我们如何对来自 Spark RDD 的数据进行排序和分组?
How we can sort and group data from the Spark RDDs?
data.csv 文件中的数据是:
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
这是我的代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ScalaApp {
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "Program")
// we take the raw data in CSV format and convert it into a
val data = sc.textFile("data.csv")
.map(line => line.split(","))
.map(GroupRecord => (GroupRecord(0),
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
val numPurchases = data.count()
val d1=data.groupByKey(GroupRecord(2)) // here is the error
println("No: " + numPurchases)
println("Grouped Data" + d1)
}
}
我只想要按源 IP(第 2 列)分组并按时间排序(第 1 列)的相同数据。
所以我需要的数据是:
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
但是我的代码有问题所以请帮助我!
你的问题是你的第二个 map
创建了一个 Tuple6
而不是键值对,如果你想进行 xxxByKey 操作,这是必需的。如果你想按第 2 列分组,你应该将 GroupRecord(1)
作为你的键和其余值,然后调用 groupByKey
,像这样:
data
.map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
.groupByKey()
正如 Glennie 指出的那样,您不会为 groupByKey
操作创建键值对。但是,您也可以使用 groupBy(_._3)
来获得相同的结果。为了根据第一列对每个组进行排序,您可以在分组后应用 flatMapValues
对每个组中的项目进行排序。以下代码正是这样做的:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val data = sc.textFile("data.csv")
.map(line => line.split("\s+"))
.map(GroupRecord => (GroupRecord(2), (GroupRecord(0), GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))))
// sort the groups by the first tuple field
val result = data.groupByKey.flatMapValues(x => x.toList.sortBy(_._1))
// assign the partition ID to each item to see that each group is sorted
val resultWithPartitionID = result.mapPartitionsWithIndex((id, it) => it.map(x => (id, x)))
// print the contents of the RDD, elements of different partitions might be interleaved
resultWithPartitionID foreach println
val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2)
// print collected results
println(collectedResult.mkString("\n"))
}
这里我们需要将其转换为键、值对以应用 GroupByKey 机制,然后值将转换为可迭代集并对每个键的值应用排序,我们需要将其转换为序列和然后应用 sortby 功能,之后 flatMap 函数会将顺序值展平为字符串集。
Data.csv ->
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
代码 ->
val data = sc.textFile("src/Data.csv")
.map(line => {
val GroupRecord = line.split("\t")
((GroupRecord(1)), (GroupRecord(0), GroupRecord(2), GroupRecord(3), GroupRecord(4), GroupRecord(5)))
})
val numPurchases = data.count()
val d1 = data.groupByKey().map(f => (f._1, f._2.toSeq.sortBy(f => f._1))).flatMapValues(f => f).map(f => (f._2._1, f._1, f._2._2, f._2._3, f._2._4, f._2._5))
d1 foreach (println(_))
println("No: " + numPurchases)
上述解决方案可以正常工作。但是当数据增加到大尺寸时我不确定,因为你需要处理重新洗牌
最好的方法是创建一个数据框并使用 sqlContext 按 IP 地址和时间排序
data.csv 文件中的数据是:
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
这是我的代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object ScalaApp {
def main(args: Array[String]) {
val sc = new SparkContext("local[4]", "Program")
// we take the raw data in CSV format and convert it into a
val data = sc.textFile("data.csv")
.map(line => line.split(","))
.map(GroupRecord => (GroupRecord(0),
GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
val numPurchases = data.count()
val d1=data.groupByKey(GroupRecord(2)) // here is the error
println("No: " + numPurchases)
println("Grouped Data" + d1)
}
}
我只想要按源 IP(第 2 列)分组并按时间排序(第 1 列)的相同数据。 所以我需要的数据是:
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
但是我的代码有问题所以请帮助我!
你的问题是你的第二个 map
创建了一个 Tuple6
而不是键值对,如果你想进行 xxxByKey 操作,这是必需的。如果你想按第 2 列分组,你应该将 GroupRecord(1)
作为你的键和其余值,然后调用 groupByKey
,像这样:
data
.map(GroupRecord => (GroupRecord(1),(GroupRecord(0),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5)))
.groupByKey()
正如 Glennie 指出的那样,您不会为 groupByKey
操作创建键值对。但是,您也可以使用 groupBy(_._3)
来获得相同的结果。为了根据第一列对每个组进行排序,您可以在分组后应用 flatMapValues
对每个组中的项目进行排序。以下代码正是这样做的:
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val data = sc.textFile("data.csv")
.map(line => line.split("\s+"))
.map(GroupRecord => (GroupRecord(2), (GroupRecord(0), GroupRecord(1),GroupRecord(2),GroupRecord(3),GroupRecord(4),GroupRecord(5))))
// sort the groups by the first tuple field
val result = data.groupByKey.flatMapValues(x => x.toList.sortBy(_._1))
// assign the partition ID to each item to see that each group is sorted
val resultWithPartitionID = result.mapPartitionsWithIndex((id, it) => it.map(x => (id, x)))
// print the contents of the RDD, elements of different partitions might be interleaved
resultWithPartitionID foreach println
val collectedResult = resultWithPartitionID.collect.sortBy(_._1).map(_._2)
// print collected results
println(collectedResult.mkString("\n"))
}
这里我们需要将其转换为键、值对以应用 GroupByKey 机制,然后值将转换为可迭代集并对每个键的值应用排序,我们需要将其转换为序列和然后应用 sortby 功能,之后 flatMap 函数会将顺序值展平为字符串集。
Data.csv ->
07:36:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:33:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:34:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:35:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:44:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:45:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:46:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:47:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:48:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:36:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.107 104.70.250.141 80 57188 0.48
07:37:00 PM 172.20.16.105 104.70.250.141 80 57188 0.66
07:38:00 PM 172.20.16.105 104.70.250.141 80 57188 0.47
07:39:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
07:50:00 PM 172.20.16.106 104.70.250.141 80 57188 0.49
07:51:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:52:00 PM 172.20.16.106 104.70.250.141 80 57188 0.33
07:53:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:54:00 PM 172.20.16.106 104.70.250.141 80 57188 0.48
07:40:00 PM 172.20.16.105 104.70.250.141 80 57188 0.48
代码 ->
val data = sc.textFile("src/Data.csv")
.map(line => {
val GroupRecord = line.split("\t")
((GroupRecord(1)), (GroupRecord(0), GroupRecord(2), GroupRecord(3), GroupRecord(4), GroupRecord(5)))
})
val numPurchases = data.count()
val d1 = data.groupByKey().map(f => (f._1, f._2.toSeq.sortBy(f => f._1))).flatMapValues(f => f).map(f => (f._2._1, f._1, f._2._2, f._2._3, f._2._4, f._2._5))
d1 foreach (println(_))
println("No: " + numPurchases)
上述解决方案可以正常工作。但是当数据增加到大尺寸时我不确定,因为你需要处理重新洗牌
最好的方法是创建一个数据框并使用 sqlContext 按 IP 地址和时间排序