Spark Scala:GroupByKey 和排序
Spark Scala: GroupByKey and sort
我有一个 RDD
结构如下:
val rdd = RDD[ (category: String, product: String, score: Double) ]
我的objective是根据类别group
数据,然后对每个类别sort
w.r.t。 Tuple 2 (product, score)
的分数。至于现在我的代码是:
val result = rdd.groupByKey.mapValues(v => v.toList.sortBy(-_._2))
事实证明,对于我所拥有的数据来说,这是一项非常昂贵的操作。我希望使用替代方法提高性能。
在不知道您的数据集的情况下很难回答,但是 documentation 有一些线索:groupByKey
性能:
Note: This operation may be very expensive. If you are grouping in
order to perform an aggregation (such as a sum or average) over each
key, using PairRDDFunctions.aggregateByKey or
PairRDDFunctions.reduceByKey will provide much better performance.
所以这取决于您打算如何处理排序后的列表。如果您需要每个列表的全部,那么可能很难在 groupByKey
上进行改进。如果您正在执行某种聚合,那么上面的替代操作(aggregateByKey
、reduceByKey
)可能会更好。
根据列表的大小,可能在排序前使用替代集合(例如可变数组)更有效。
编辑:如果你的类别比较少,可以尝试反复过滤原始RDD,并对每个过滤后的RDD进行排序。虽然总体上完成的工作量相似,但在任何给定时刻使用的内存可能更少。
编辑 2:如果内存不足是个问题,您可以将类别和产品表示为整数 ID 而不是字符串,并且稍后只查找名称。这样,您的主 RDD 可能会小得多。
您的 RDD 在类别上的分布是否公平?根据偏斜因素,您可能会遇到问题。
如果您没有太多键值,请尝试这样的操作:
val rdd: RDD[(String, String, Double)] = sc.parallelize(Seq(("someCategory","a",1.0),("someCategory","b",3.0),("someCategory2","c",4.0)))
rdd.keyBy(_._1).countByKey().foreach(println)
我有一个 RDD
结构如下:
val rdd = RDD[ (category: String, product: String, score: Double) ]
我的objective是根据类别group
数据,然后对每个类别sort
w.r.t。 Tuple 2 (product, score)
的分数。至于现在我的代码是:
val result = rdd.groupByKey.mapValues(v => v.toList.sortBy(-_._2))
事实证明,对于我所拥有的数据来说,这是一项非常昂贵的操作。我希望使用替代方法提高性能。
在不知道您的数据集的情况下很难回答,但是 documentation 有一些线索:groupByKey
性能:
Note: This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.
所以这取决于您打算如何处理排序后的列表。如果您需要每个列表的全部,那么可能很难在 groupByKey
上进行改进。如果您正在执行某种聚合,那么上面的替代操作(aggregateByKey
、reduceByKey
)可能会更好。
根据列表的大小,可能在排序前使用替代集合(例如可变数组)更有效。
编辑:如果你的类别比较少,可以尝试反复过滤原始RDD,并对每个过滤后的RDD进行排序。虽然总体上完成的工作量相似,但在任何给定时刻使用的内存可能更少。
编辑 2:如果内存不足是个问题,您可以将类别和产品表示为整数 ID 而不是字符串,并且稍后只查找名称。这样,您的主 RDD 可能会小得多。
您的 RDD 在类别上的分布是否公平?根据偏斜因素,您可能会遇到问题。 如果您没有太多键值,请尝试这样的操作:
val rdd: RDD[(String, String, Double)] = sc.parallelize(Seq(("someCategory","a",1.0),("someCategory","b",3.0),("someCategory2","c",4.0)))
rdd.keyBy(_._1).countByKey().foreach(println)