spark 中列表值的计数 - 数据框
Count of List values in spark - dataframe
在 cassandra 中,我有一个列表列类型。我是 spark 和 scala 的新手,不知道从哪里开始。
在 spark 中,我想计算每个值,是否可以这样做。
下面是数据框
+--------------------+------------+
| id| data|
+--------------------+------------+
|53e5c3b0-8c83-11e...| [b, c]|
|508c1160-8c83-11e...| [a, b]|
|4d16c0c0-8c83-11e...| [a, b, c]|
|5774dde0-8c83-11e...|[a, b, c, d]|
+--------------------+------------+
我希望输出为
+--------------------+------------+
| value | count |
+--------------------+------------+
|a | 3 |
|b | 4 |
|c | 3 |
|d | 1 |
+--------------------+------------+
火花版本:1.4
你需要这样的东西(来自Apache Spark Examples):
val textFile = sc.textFile("hdfs://...")
val counts = textFile
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
猜测你已经有了对,.reduceByKey(_ + _) 将 return 你需要的。
您也可以在 spark shell 中尝试这样的操作:
sc.parallelize(Array[Integer](1,1,1,2,2),3).map(x=>(x,1)).reduceByKey(_+_).foreach(println)
给你:
scala> val rdd = sc.parallelize(
Seq(
("53e5c3b0-8c83-11e", Array("b", "c")),
("53e5c3b0-8c83-11e1", Array("a", "b")),
("53e5c3b0-8c83-11e2", Array("a", "b", "c")),
("53e5c3b0-8c83-11e3", Array("a", "b", "c", "d"))))
// rdd: org.apache.spark.rdd.RDD[(String, Array[String])] = ParallelCollectionRDD[22] at parallelize at <console>:27
scala> rdd.flatMap(_._2).map((_, 1)).reduceByKey(_ + _)
// res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:30
scala> rdd.flatMap(_._2).map((_,1)).reduceByKey(_ + _).collect
// res16: Array[(String, Int)] = Array((a,3), (b,4), (c,3), (d,1))
对于 DataFrame API 这实际上也很容易:
scala> val df = rdd.toDF("id", "data")
// res12: org.apache.spark.sql.DataFrame = ["id": string, "data": array<string>]
scala> df.select(explode($"data").as("value")).groupBy("value").count.show
// +-----+-----+
// |value|count|
// +-----+-----+
// | d| 1|
// | c| 3|
// | b| 4|
// | a| 3|
// +-----+-----+
在 cassandra 中,我有一个列表列类型。我是 spark 和 scala 的新手,不知道从哪里开始。 在 spark 中,我想计算每个值,是否可以这样做。 下面是数据框
+--------------------+------------+
| id| data|
+--------------------+------------+
|53e5c3b0-8c83-11e...| [b, c]|
|508c1160-8c83-11e...| [a, b]|
|4d16c0c0-8c83-11e...| [a, b, c]|
|5774dde0-8c83-11e...|[a, b, c, d]|
+--------------------+------------+
我希望输出为
+--------------------+------------+
| value | count |
+--------------------+------------+
|a | 3 |
|b | 4 |
|c | 3 |
|d | 1 |
+--------------------+------------+
火花版本:1.4
你需要这样的东西(来自Apache Spark Examples):
val textFile = sc.textFile("hdfs://...")
val counts = textFile
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
猜测你已经有了对,.reduceByKey(_ + _) 将 return 你需要的。
您也可以在 spark shell 中尝试这样的操作:
sc.parallelize(Array[Integer](1,1,1,2,2),3).map(x=>(x,1)).reduceByKey(_+_).foreach(println)
给你:
scala> val rdd = sc.parallelize(
Seq(
("53e5c3b0-8c83-11e", Array("b", "c")),
("53e5c3b0-8c83-11e1", Array("a", "b")),
("53e5c3b0-8c83-11e2", Array("a", "b", "c")),
("53e5c3b0-8c83-11e3", Array("a", "b", "c", "d"))))
// rdd: org.apache.spark.rdd.RDD[(String, Array[String])] = ParallelCollectionRDD[22] at parallelize at <console>:27
scala> rdd.flatMap(_._2).map((_, 1)).reduceByKey(_ + _)
// res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:30
scala> rdd.flatMap(_._2).map((_,1)).reduceByKey(_ + _).collect
// res16: Array[(String, Int)] = Array((a,3), (b,4), (c,3), (d,1))
对于 DataFrame API 这实际上也很容易:
scala> val df = rdd.toDF("id", "data")
// res12: org.apache.spark.sql.DataFrame = ["id": string, "data": array<string>]
scala> df.select(explode($"data").as("value")).groupBy("value").count.show
// +-----+-----+
// |value|count|
// +-----+-----+
// | d| 1|
// | c| 3|
// | b| 4|
// | a| 3|
// +-----+-----+