Spark:按组对记录进行排序?
Spark: Sort records in groups?
我有一组记录需要:
1) 按 'date'、'city' 和 'kind'
分组
2) 按“奖品”对每组进行排序
在我的代码中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
val recs = Array (
Record("n1", "d1", "k1", "c1", 10),
Record("n1", "d1", "k1", "c1", 9),
Record("n1", "d1", "k1", "c1", 8),
Record("n2", "d2", "k2", "c2", 1),
Record("n2", "d2", "k2", "c2", 2),
Record("n2", "d2", "k2", "c2", 3)
)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
x.sortByKey()
}
}
当我尝试对组进行排序时出现错误:
value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int,
Sort.Record)]]
怎么了?如何排序?
您需要定义一个Key,然后定义mapValues 来对它们进行排序。
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
// Define your data
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
// Generate pair RDD neccesary to call groupByKey and group it
val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey
// Once grouped you need to sort values of each Key
val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))
// Print result
values.collect.foreach(println)
}
}
将map
替换为flatMap
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
这会给你一个
org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10]
然后你可以在上面的RDD上调用sortBy(_._1)。
作为@gasparms 解决方案的替代方案,我认为可以尝试使用过滤器然后进行 rdd.sortyBy 操作。您筛选满足关键条件的每条记录。先决条件是您需要跟踪所有密钥(过滤器组合)。您还可以在遍历记录时构建它。
groupByKey 很昂贵,它有两个含义:
- 大部分数据在平均剩余的 N-1 个分区中被打乱。
- 同一键的所有记录都加载到单个执行程序的内存中,可能会导致内存错误。
根据您的用例,您有不同的更好选择:
- 如果您不关心顺序,请使用 reduceByKey 或 aggregateByKey。
- 如果你只想分组和排序而不进行任何转换,更喜欢使用 repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions) but be very careful of what partitioner you specify and test it because you are now relying on side effects that may change behaviour in a different environment. See also examples in this repository: https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala.
- 如果您正在对已排序记录的可迭代应用转换或不可简化聚合(折叠或扫描),请查看此库:spark-sorted https://github.com/tresata/spark-sorted。它为成对的rdds提供了3个API:mapStreamByKey、foldLeftByKey和scanLeftByKey。
我有一组记录需要:
1) 按 'date'、'city' 和 'kind'
分组2) 按“奖品”对每组进行排序
在我的代码中:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
val recs = Array (
Record("n1", "d1", "k1", "c1", 10),
Record("n1", "d1", "k1", "c1", 9),
Record("n1", "d1", "k1", "c1", 8),
Record("n2", "d2", "k2", "c2", 1),
Record("n2", "d2", "k2", "c2", 2),
Record("n2", "d2", "k2", "c2", 3)
)
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
val rsGrp = rs.groupBy(r => (r.day, r.kind, r.city)).map(_._2)
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
x.sortByKey()
}
}
当我尝试对组进行排序时出现错误:
value sortByKey is not a member of org.apache.spark.rdd.RDD[List[(Int,
Sort.Record)]]
怎么了?如何排序?
您需要定义一个Key,然后定义mapValues 来对它们进行排序。
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
object Sort {
case class Record(name:String, day: String, kind: String, city: String, prize:Int)
// Define your data
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val rs = sc.parallelize(recs)
// Generate pair RDD neccesary to call groupByKey and group it
val key: RDD[((String, String, String), Iterable[Record])] = rs.keyBy(r => (r.day, r.city, r.kind)).groupByKey
// Once grouped you need to sort values of each Key
val values: RDD[((String, String, String), List[Record])] = key.mapValues(iter => iter.toList.sortBy(_.prize))
// Print result
values.collect.foreach(println)
}
}
将map
替换为flatMap
val x = rsGrp.map{r =>
val lst = r.toList
lst.map{e => (e.prize, e)}
}
这会给你一个
org.apache.spark.rdd.RDD[(Int, Record)] = FlatMappedRDD[10]
然后你可以在上面的RDD上调用sortBy(_._1)。
作为@gasparms 解决方案的替代方案,我认为可以尝试使用过滤器然后进行 rdd.sortyBy 操作。您筛选满足关键条件的每条记录。先决条件是您需要跟踪所有密钥(过滤器组合)。您还可以在遍历记录时构建它。
groupByKey 很昂贵,它有两个含义:
- 大部分数据在平均剩余的 N-1 个分区中被打乱。
- 同一键的所有记录都加载到单个执行程序的内存中,可能会导致内存错误。
根据您的用例,您有不同的更好选择:
- 如果您不关心顺序,请使用 reduceByKey 或 aggregateByKey。
- 如果你只想分组和排序而不进行任何转换,更喜欢使用 repartitionAndSortWithinPartitions (Spark 1.3.0+ http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.OrderedRDDFunctions) but be very careful of what partitioner you specify and test it because you are now relying on side effects that may change behaviour in a different environment. See also examples in this repository: https://github.com/sryza/aas/blob/master/ch08-geotime/src/main/scala/com/cloudera/datascience/geotime/RunGeoTime.scala.
- 如果您正在对已排序记录的可迭代应用转换或不可简化聚合(折叠或扫描),请查看此库:spark-sorted https://github.com/tresata/spark-sorted。它为成对的rdds提供了3个API:mapStreamByKey、foldLeftByKey和scanLeftByKey。