Spark 列明智的字数统计
Spark column wise word count
我们正在尝试在 spark 中生成我们数据集的按列统计。除了使用统计库中的汇总功能。我们正在使用以下程序:
我们确定具有字符串值的列
生成整个数据集的键值对,以列号为键,列的值为值
生成格式为
的新地图
(K,V) ->((K,V),1)
然后我们使用 reduceByKey 来查找所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。
在下一步中,我们使用 for 循环遍历列以查找所有列的统计信息。
我们正在尝试再次使用 map reduce 方法来减少 for 循环,但我们无法找到实现它的方法。这样做将允许我们在一次执行中为所有列生成列统计信息。 for 循环方法是 运行 顺序使其非常慢。
代码:
//drops the header
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
def retAtrTuple(x: String) = {
val newX = x.split(",")
for (h <- 0 until newX.length)
yield (h,newX(h))
}
val line = sc.textFile("hdfs://.../myfile.csv")
val withoutHeader: RDD[String] = dropHeader(line)
val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value
var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey() //this contains column indexes as key and boolean as value (true for numeric and false for string type)
var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}
var str_col = str_cols.toArray //array consisting the string col
var num_col = num_cols.toArray //array consisting numeric col
val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }
//running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
for(i <- str_col){
var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
var leastOnes = total.take(10)
println("leastOnes for Col" + i)
leastOnes.foreach(println)
var maxOnes = total.sortBy(-_._2._2).take(10)
println("maxOnes for Col" + i)
maxOnes.foreach(println)
println("distinct for Col" + i + " is " + total.count)
}
让我稍微简化一下您的问题。 (实际上很多。)我们有一个 RDD[(Int, String)]
,我们想为每个 Int
找到前 10 个最常见的 String
(都在 0-100 范围内)。
与您的示例中的排序不同,使用 Spark 内置的 RDD.top(n)
方法效率更高。它的 运行 时间与数据大小呈线性关系,并且需要移动的数据比排序少得多。
考虑 RDD.scala 中 top
的实现。你想做同样的事情,但每个 Int
键有一个优先级队列(堆)。代码变得相当复杂:
import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.
def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
// A heap that only keeps the top N values, so it has bounded size.
type Heap = BoundedPriorityQueue[(Long, String)]
// Get the word counts.
val counts: RDD[[(Int, String), Long)] =
rdd.map(_ -> 1L).reduceByKey(_ + _)
// In each partition create a column -> heap map.
val perPartition: RDD[Map[Int, Heap]] =
counts.mapPartitions { items =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for (((k, v), count) <- items) {
heaps(k) += count -> v
}
Iterator.single(heaps)
}
// Merge the per-partition heap maps into one.
val merged: Map[Int, Heap] =
perPartition.reduce { (heaps1, heaps2) =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
for (cv <- heap) {
heaps(k) += cv
}
}
heaps
}
// Discard counts, return just the top strings.
merged.mapValues(_.map { case(count, value) => value })
}
这很有效,但很痛苦,因为我们需要同时处理多个列。每列有一个 RDD
并在每个列上调用 rdd.top(10)
会更容易。
不幸的是,将 RDD 拆分为 N 个更小的 RDD 的天真方法执行了 N 次:
def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
together.cache // We will make N passes over this RDD.
(0 until columns).map {
i => together.filter { case (key, value) => key == i }.values
}
}
一个更有效的解决方案可能是通过键将数据写出到单独的文件中,然后将其加载回单独的 RDD。 Write to multiple outputs by key Spark - one Spark job.
中对此进行了讨论
感谢@Daniel Darabos 的回答。但是有一些错误。
混合使用 Map 和 collection.mutable.Map
withDefault((i: Int) => new Heap(n)) 当你设置 heaps(k) += count -> v
[ 时不创建新堆=23=]
- 混合使用括号
修改后的代码如下:
//import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BoundedPriorityQueueTest {
//
def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
// A heap that only keeps the top N values, so it has bounded size.
type Heap = BoundedPriorityQueue[(Long, String)]
// Get the word counts.
val counts: RDD[((Int, String), Long)] =
rdd.map(_ -> 1L).reduceByKey(_ + _)
// In each partition create a column -> heap map.
val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
counts.mapPartitions { items =>
val heaps =
collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
for (((k, v), count) <- items) {
println("\n---")
println("before add " + ((k, v), count) + ", the map is: ")
println(heaps)
if (!heaps.contains(k)) {
println("not contains key " + k)
heaps(k) = new Heap(n)
println(heaps)
}
heaps(k) += count -> v
println("after add " + ((k, v), count) + ", the map is: ")
println(heaps)
}
println(heaps)
Iterator.single(heaps)
}
// Merge the per-partition heap maps into one.
val merged: collection.mutable.Map[Int, Heap] =
perPartition.reduce { (heaps1, heaps2) =>
val heaps =
collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
println(heaps)
for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
for (cv <- heap) {
heaps(k) += cv
}
}
heaps
}
// Discard counts, return just the top strings.
merged.mapValues(_.map { case (count, value) => value }).toMap
}
def main(args: Array[String]): Unit = {
Logger.getRootLogger().setLevel(Level.FATAL) //
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") //
val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
println("# words:" + words.count())
val result = top(1, words)
println("\n--result:")
println(result)
sc.stop()
print("DONE")
}
}
我们正在尝试在 spark 中生成我们数据集的按列统计。除了使用统计库中的汇总功能。我们正在使用以下程序:
我们确定具有字符串值的列
生成整个数据集的键值对,以列号为键,列的值为值
生成格式为
的新地图(K,V) ->((K,V),1)
然后我们使用 reduceByKey 来查找所有列中所有唯一值的总和。我们缓存此输出以减少进一步的计算时间。
在下一步中,我们使用 for 循环遍历列以查找所有列的统计信息。
我们正在尝试再次使用 map reduce 方法来减少 for 循环,但我们无法找到实现它的方法。这样做将允许我们在一次执行中为所有列生成列统计信息。 for 循环方法是 运行 顺序使其非常慢。
代码:
//drops the header
def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
def retAtrTuple(x: String) = {
val newX = x.split(",")
for (h <- 0 until newX.length)
yield (h,newX(h))
}
val line = sc.textFile("hdfs://.../myfile.csv")
val withoutHeader: RDD[String] = dropHeader(line)
val kvPairs = withoutHeader.flatMap(retAtrTuple) //generates a key-value pair where key is the column number and value is column's value
var bool_numeric_col = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_).sortByKey() //this contains column indexes as key and boolean as value (true for numeric and false for string type)
var str_cols = bool_numeric_col.filter{case (x,y) => y == false}.map{case (x,y) => x}
var num_cols = bool_numeric_col.filter{case (x,y) => y == true}.map{case (x,y) => x}
var str_col = str_cols.toArray //array consisting the string col
var num_col = num_cols.toArray //array consisting numeric col
val colCount = kvPairs.map((_,1)).reduceByKey(_+_)
val e1 = colCount.map{case ((x,y),z) => (x,(y,z))}
var numPairs = e1.filter{case (x,(y,z)) => str_col.contains(x) }
//running for loops which needs to be parallelized/optimized as it sequentially operates on each column. Idea is to find the top10, bottom10 and number of distinct elements column wise
for(i <- str_col){
var total = numPairs.filter{case (x,(y,z)) => x==i}.sortBy(_._2._2)
var leastOnes = total.take(10)
println("leastOnes for Col" + i)
leastOnes.foreach(println)
var maxOnes = total.sortBy(-_._2._2).take(10)
println("maxOnes for Col" + i)
maxOnes.foreach(println)
println("distinct for Col" + i + " is " + total.count)
}
让我稍微简化一下您的问题。 (实际上很多。)我们有一个 RDD[(Int, String)]
,我们想为每个 Int
找到前 10 个最常见的 String
(都在 0-100 范围内)。
与您的示例中的排序不同,使用 Spark 内置的 RDD.top(n)
方法效率更高。它的 运行 时间与数据大小呈线性关系,并且需要移动的数据比排序少得多。
考虑 RDD.scala 中 top
的实现。你想做同样的事情,但每个 Int
键有一个优先级队列(堆)。代码变得相当复杂:
import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private.
def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
// A heap that only keeps the top N values, so it has bounded size.
type Heap = BoundedPriorityQueue[(Long, String)]
// Get the word counts.
val counts: RDD[[(Int, String), Long)] =
rdd.map(_ -> 1L).reduceByKey(_ + _)
// In each partition create a column -> heap map.
val perPartition: RDD[Map[Int, Heap]] =
counts.mapPartitions { items =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for (((k, v), count) <- items) {
heaps(k) += count -> v
}
Iterator.single(heaps)
}
// Merge the per-partition heap maps into one.
val merged: Map[Int, Heap] =
perPartition.reduce { (heaps1, heaps2) =>
val heaps =
collection.mutable.Map[Int, Heap].withDefault(i => new Heap(n))
for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
for (cv <- heap) {
heaps(k) += cv
}
}
heaps
}
// Discard counts, return just the top strings.
merged.mapValues(_.map { case(count, value) => value })
}
这很有效,但很痛苦,因为我们需要同时处理多个列。每列有一个 RDD
并在每个列上调用 rdd.top(10)
会更容易。
不幸的是,将 RDD 拆分为 N 个更小的 RDD 的天真方法执行了 N 次:
def split(together: RDD[(Int, String)], columns: Int): Seq[RDD[String]] = {
together.cache // We will make N passes over this RDD.
(0 until columns).map {
i => together.filter { case (key, value) => key == i }.values
}
}
一个更有效的解决方案可能是通过键将数据写出到单独的文件中,然后将其加载回单独的 RDD。 Write to multiple outputs by key Spark - one Spark job.
中对此进行了讨论感谢@Daniel Darabos 的回答。但是有一些错误。
混合使用 Map 和 collection.mutable.Map
withDefault((i: Int) => new Heap(n)) 当你设置 heaps(k) += count -> v
[ 时不创建新堆=23=]- 混合使用括号
修改后的代码如下:
//import org.apache.spark.util.BoundedPriorityQueue // Pretend it's not private. copy to your own folder and import it
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BoundedPriorityQueueTest {
//
def top(n: Int, rdd: RDD[(Int, String)]): Map[Int, Iterable[String]] = {
// A heap that only keeps the top N values, so it has bounded size.
type Heap = BoundedPriorityQueue[(Long, String)]
// Get the word counts.
val counts: RDD[((Int, String), Long)] =
rdd.map(_ -> 1L).reduceByKey(_ + _)
// In each partition create a column -> heap map.
val perPartition: RDD[collection.mutable.Map[Int, Heap]] =
counts.mapPartitions { items =>
val heaps =
collection.mutable.Map[Int, Heap]() // .withDefault((i: Int) => new Heap(n))
for (((k, v), count) <- items) {
println("\n---")
println("before add " + ((k, v), count) + ", the map is: ")
println(heaps)
if (!heaps.contains(k)) {
println("not contains key " + k)
heaps(k) = new Heap(n)
println(heaps)
}
heaps(k) += count -> v
println("after add " + ((k, v), count) + ", the map is: ")
println(heaps)
}
println(heaps)
Iterator.single(heaps)
}
// Merge the per-partition heap maps into one.
val merged: collection.mutable.Map[Int, Heap] =
perPartition.reduce { (heaps1, heaps2) =>
val heaps =
collection.mutable.Map[Int, Heap]() //.withDefault((i: Int) => new Heap(n))
println(heaps)
for ((k, heap) <- heaps1.toSeq ++ heaps2.toSeq) {
for (cv <- heap) {
heaps(k) += cv
}
}
heaps
}
// Discard counts, return just the top strings.
merged.mapValues(_.map { case (count, value) => value }).toMap
}
def main(args: Array[String]): Unit = {
Logger.getRootLogger().setLevel(Level.FATAL) //
val conf = new SparkConf().setAppName("word count").setMaster("local[1]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN") //
val words = sc.parallelize(List((1, "s11"), (1, "s11"), (1, "s12"), (1, "s13"), (2, "s21"), (2, "s22"), (2, "s22"), (2, "s23")))
println("# words:" + words.count())
val result = top(1, words)
println("\n--result:")
println(result)
sc.stop()
print("DONE")
}
}