Spark DataFrame/Dataset 为每个键找到最常见的值 高效的方式
Spark DataFrame/Dataset Find most common value for each key Efficient way
问题:
我在映射 spark 中键的最常见值时遇到问题(使用 scala)。我已经用 RDD 完成了,但不知道如何用 DF/DS(sparksql)
有效地完成
数据集就像
key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
经过 spark 转换和 access 输出应该是每个键都有其共同的值
输出
key1 = valueb
key2 = valuec
key3 = valuea
尝试到现在:
RDD
我试图在 RDD 中按 (key,value),count
组进行映射和缩减,这很合逻辑,但我无法将其转换为 sparksql(DataFrame/Dataset)(因为我希望跨网络进行最小洗牌)
这是我的 RDD 代码
val data = List(
"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"
)
val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)
val lineRDD = sc.parallelize(data)
val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}
val flatPairsRDD = pairedRDD.flatMap {
(key, val) => ((key, val), 1)
}
val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)
val resultsRDD = SumRDD.map{
case ((key, val), count) => (key, (val,count))
}.groupByKey.map{
case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}
resultsRDD.collect().foreach(println)
DataFrame,使用窗口化: 我正在尝试使用 Window.partitionBy("key", "value")
聚合 count over the window
。然后 sorting
和 agg()
分别
根据我从您的问题中了解到的情况,您可以执行以下操作
首先你必须读取数据并将其转换为dataframe
val df = sc.textFile("path to the data file") //reading file line by line
.map(line => line.split("=")) // splitting each line by =
.map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created
.toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._
这将是
+----+-------+
|key |value |
+----+-------+
|key1|value_a|
|key1|value_b|
|key1|value_b|
|key2|value_a|
|key2|value_c|
|key2|value_c|
|key3|value_a|
+----+-------+
下一步是计算每个键的相同值的重复次数和 select 每个键重复次数最多的值,这可以通过使用 Window
函数来完成,并且 aggregations
如下
import org.apache.spark.sql.expressions._ //import Window library
def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation
import org.apache.spark.sql.functions._ //importing inbuilt functions
df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count
.orderBy($"count".desc) // order dataframe with count in descending order
.groupBy("key") // group by key
.agg(first("value").as("value")) //taking the first row of each key with count column as the highest
因此最终输出应该等于
+----+-------+
|key |value |
+----+-------+
|key3|value_a|
|key1|value_b|
|key2|value_c|
+----+-------+
使用 groupBy
怎么样?
val maxFreq= udf((values: List[Int]) => {
values.groupBy(identity).mapValues(_.size).maxBy(_._2)._1
})
df.groupBy("key")
.agg(collect_list("value") as "valueList")
.withColumn("mostFrequentValue", maxFreq(col("valueList")))
问题: 我在映射 spark 中键的最常见值时遇到问题(使用 scala)。我已经用 RDD 完成了,但不知道如何用 DF/DS(sparksql)
有效地完成数据集就像
key1 = value_a
key1 = value_b
key1 = value_b
key2 = value_a
key2 = value_c
key2 = value_c
key3 = value_a
经过 spark 转换和 access 输出应该是每个键都有其共同的值
输出
key1 = valueb
key2 = valuec
key3 = valuea
尝试到现在:
RDD
我试图在 RDD 中按 (key,value),count
组进行映射和缩减,这很合逻辑,但我无法将其转换为 sparksql(DataFrame/Dataset)(因为我希望跨网络进行最小洗牌)
这是我的 RDD 代码
val data = List(
"key1,value_a",
"key1,value_b",
"key1,value_b",
"key2,value_a",
"key2,value_c",
"key2,value_c",
"key3,value_a"
)
val sparkConf = new SparkConf().setMaster("local").setAppName("example")
val sc = new SparkContext(sparkConf)
val lineRDD = sc.parallelize(data)
val pairedRDD = lineRDD.map { line =>
val fields = line.split(",")
(fields(0), fields(2))
}
val flatPairsRDD = pairedRDD.flatMap {
(key, val) => ((key, val), 1)
}
val SumRDD = flatPairsRDD.reduceByKey((a, b) => a + b)
val resultsRDD = SumRDD.map{
case ((key, val), count) => (key, (val,count))
}.groupByKey.map{
case (key, valList) => (name, valList.toList.sortBy(_._2).reverse.head)
}
resultsRDD.collect().foreach(println)
DataFrame,使用窗口化: 我正在尝试使用 Window.partitionBy("key", "value")
聚合 count over the window
。然后 sorting
和 agg()
分别
根据我从您的问题中了解到的情况,您可以执行以下操作
首先你必须读取数据并将其转换为dataframe
val df = sc.textFile("path to the data file") //reading file line by line
.map(line => line.split("=")) // splitting each line by =
.map(array => (array(0).trim, array(1).trim)) //tuple2(key, value) created
.toDF("key", "value") //rdd converted to dataframe which required import sqlContext.implicits._
这将是
+----+-------+
|key |value |
+----+-------+
|key1|value_a|
|key1|value_b|
|key1|value_b|
|key2|value_a|
|key2|value_c|
|key2|value_c|
|key3|value_a|
+----+-------+
下一步是计算每个键的相同值的重复次数和 select 每个键重复次数最多的值,这可以通过使用 Window
函数来完成,并且 aggregations
如下
import org.apache.spark.sql.expressions._ //import Window library
def windowSpec = Window.partitionBy("key", "value") //defining a window frame for the aggregation
import org.apache.spark.sql.functions._ //importing inbuilt functions
df.withColumn("count", count("value").over(windowSpec)) // counting repeatition of value for each group of key, value and assigning that value to new column called as count
.orderBy($"count".desc) // order dataframe with count in descending order
.groupBy("key") // group by key
.agg(first("value").as("value")) //taking the first row of each key with count column as the highest
因此最终输出应该等于
+----+-------+
|key |value |
+----+-------+
|key3|value_a|
|key1|value_b|
|key2|value_c|
+----+-------+
使用 groupBy
怎么样?
val maxFreq= udf((values: List[Int]) => {
values.groupBy(identity).mapValues(_.size).maxBy(_._2)._1
})
df.groupBy("key")
.agg(collect_list("value") as "valueList")
.withColumn("mostFrequentValue", maxFreq(col("valueList")))