拆分 Spark 数据框并根据一列值计算平均值

Split Spark dataframe and calculate average based on one column value

我有两个数据帧,第一个数据帧 classRecord 有 10 个不同的条目,如下所示:

Class, Calculation
first, Average
Second, Sum
Third, Average

第二个数据帧 studentRecord 有大约 50K 个条目,如下所示:

Name, height, Camp, Class
Shae, 152, yellow, first
Joe, 140, yellow, first
Mike, 149, white, first
Anne, 142, red, first
Tim, 154, red, Second
Jake, 153, white, Second
Sherley, 153, white, Second

从第二个数据帧,基于 class 类型,我想对高度进行计算(对于 class 首先:平均值,对于 class 第二:总和等)分别基于阵营(如果 class 是第一个,分别是黄色、白色等的平均值)。 我尝试了以下代码:

//function to calculate average
def averageOnName(splitFrame : org.apache.spark.sql.DataFrame ) : Array[(String, Double)] = {
  val pairedRDD: RDD[(String, Double)] = splitFrame.select($"Name",$"height".cast("double")).as[(String, Double)].rdd
  var avg_by_key = pairedRDD.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(y => 1.0 * y._1 / y._2).collect
  return avg_by_key
}

//required schema for further modifications
val schema = StructType(
StructField("name", StringType, false) ::
StructField("avg", DoubleType, false) :: Nil)

// for each loop on each class type
classRecord.rdd.foreach{
  //filter students based on camps
  var campYellow =studentRecord.filter($"Camp" === "yellow")
  var campWhite =studentRecord.filter($"Camp" === "white")
  var campRed =studentRecord.filter($"Camp" === "red")

  // since I know that calculation for first class is average, so representing calculation only for class first
  val avgcampYellow  =  averageOnName(campYellow)
  val avgcampWhite   =  averageOnName(campWhite)
  val avgcampRed   =  averageOnName(campRed)

  // union of all
  val rddYellow = sc.parallelize (avgcampYellow).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfYellow = sqlContext.createDataFrame(rddYellow, schema)
  //union with yellow camp data
  val rddWhite = sc.parallelize (avgcampWhite).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfWhite = sqlContext.createDataFrame(rddWhite, schema)
  var dfYellWhite = dfYellow.union(dfWhite)
  //union with yellow,white camp data
  val rddRed = sc.parallelize (avgcampRed).map (x => org.apache.spark.sql.Row(x._1, x._2.asInstanceOf[Number].doubleValue()))
  //conversion of rdd to frame
  var dfRed = sqlContext.createDataFrame(rddRed, schema)
  var dfYellWhiteRed = dfYellWhite .union(dfRed)
  // other modifications and final result to hive
}

我在这里苦苦挣扎:

  1. 硬编码黄色、红色和白色,可能还有其他营地类型。
  2. 数据帧当前被过滤了很多次,可以改进。
  3. 我无法弄清楚如何根据 class 计算类型进行不同的计算(即根据 class 类型使用 sum/averge)。

感谢任何帮助。

您可以简单地对 Class/Camp 的所有组合进行平均和求和计算,然后分别解析 classRecord 数据帧并提取您需要的内容。您可以使用 groupBy() 方法在 spark 中轻松完成此操作并聚合值。

使用您的示例数据框:

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._

studentRecord.show()

+-------+------+------+------+
|   Name|height|  Camp| Class|
+-------+------+------+------+
|   Shae|   152|yellow| first|
|    Joe|   140|yellow| first|
|   Mike|   149| white| first|
|   Anne|   142|   red| first|
|    Tim|   154|   red|Second|
|   Jake|   153| white|Second|
|Sherley|   153| white|Second|
+-------+------+------+------+

val df = studentRecord.groupBy("Class", "Camp")
  .agg(
    sum($"height").as("Sum"), 
    avg($"height").as("Average"), 
    collect_list($"Name").as("Names")
  )
df.show()

+------+------+---+-------+---------------+
| Class|  Camp|Sum|Average|          Names|
+------+------+---+-------+---------------+
| first| white|149|  149.0|         [Mike]|
| first|   red|142|  142.0|         [Anne]|
|Second|   red|154|  154.0|          [Tim]|
|Second| white|306|  153.0|[Jake, Sherley]|
| first|yellow|292|  146.0|    [Shae, Joe]|
+------+------+---+-------+---------------+

完成此操作后,您可以简单地检查您的第一个 classRecord 数据框,然后检查您需要的行。它的外观示例,可以根据您的实际需要进行更改:

// Collects the dataframe as an Array[(String, String)]
val classRecs = classRecord.collect().map{case Row(clas: String, calc: String) => (clas, calc)}

for (classRec <- classRecs){
  val clas = classRec._1
  val calc = classRec._2

  // Matches which calculation you want to do
  val df2 = calc match {
    case "Average" => df.filter($"Class" === clas).select("Class", "Camp", "Average")
    case "Sum" => df.filter($"Class" === clas).select("Class", "Camp", "Sum")
  }

// Do something with df2
}

希望对您有所帮助!