如何在 Spark 1.5 中转置数据帧(没有可用的枢轴运算符)?

How to transpose dataframe in Spark 1.5 (no pivot operator available)?

我想使用没有 Pivot 函数的 spark scala 转置以下 table

我正在使用 Spark 1.5.1,但 1.5.1 不支持 Pivot 功能。请建议 suitable 方法转置以下 table:

Customer Day   Sales
1        Mon    12
1        Tue    10
1        Thu    15
1        Fri     2
2        Sun    10
2        Wed     5
2        Thu     4
2        Fri     3

输出table:

Customer Sun Mon Tue Wed Thu Fri
   1      0   12  10   0  15  2
   2     10    0   0   5  4   3

以下代码无法正常工作,因为我使用的是 Spark 1.5.1,而 Spark 1.6 提供了枢轴函数:

    var Trans = Cust_Sales.groupBy("Customer").Pivot("Day").sum("Sales")

不确定效率如何,但您可以使用 collect 获取所有不同的日期,然后添加这些列,然后使用 groupBysum:

// get distinct days from data (this assumes there are not too many of them):
val days: Array[String] = df.select("Day")
    .distinct()
    .collect()
    .map(_.getAs[String]("Day"))

// add column for each day with the Sale value if days match:
val withDayColumns = days.foldLeft(df) { 
    case (data, day) => data.selectExpr("*", s"IF(Day = '$day', Sales, 0) AS $day")
}

// wrap it up 
val result = withDayColumns
   .drop("Day")
   .drop("Sales")
   .groupBy("Customer")
   .sum(days: _*)

result.show()

打印(几乎)您想要的内容:

+--------+--------+--------+--------+--------+--------+--------+
|Customer|sum(Tue)|sum(Thu)|sum(Sun)|sum(Fri)|sum(Mon)|sum(Wed)|
+--------+--------+--------+--------+--------+--------+--------+
|       1|      10|      15|       0|       2|      12|       0|
|       2|       0|       4|      10|       3|       0|       5|
+--------+--------+--------+--------+--------+--------+--------+

如果需要,我会留给您重命名/重新排序列。

如果您正在使用 python,下面的代码可能会有所帮助。假设您想转置 spark DataFrame df:

pandas_df = df.toPandas().transpose().reset_index()
transposed_df = sqlContext.createDataFrame(pandas_df)
transposed_df.show()

考虑一个有 6 列的数据框,我们希望按前 4 列分组并在 col5 上进行透视,同时在 col6 上聚合(比如求和)。 所以假设你不能使用 spark 1.6 版本,那么下面的代码可以写成(在 spark 1.5 中):

val pivotedDf = df_to_pivot
    .groupBy(col1,col2,col3,col4)
    .pivot(col5)
    .agg(sum(col6))

这是具有相同输出但未使用 in-built 枢轴函数的代码:

import scala.collection.SortedMap

//Extracting the col5 distinct values to create the new columns
val distinctCol5Values = df_to_pivot
    .select(col(col5))
    .distinct
    .sort(col5)  // ensure that the output columns are in a consistent logical order
    .map(_.getString(0))
    .toArray
    .toSeq

//Grouping by the data frame to be pivoted on col1-col4
val pivotedAgg = df_to_pivot.rdd
      .groupBy{row=>(row.getString(col1Index),
      row.getDate(col2Index),
      row.getDate(col3Index),
      row.getString(col4Index))}

//Initializing a List of tuple of (String, double values) to be filled in the columns that will be created
val pivotColListTuple = distinctCol5Values.map(ft=> (ft,0.0))
// Using Sorted Map to ensure the order is maintained
var distinctCol5ValuesListMap = SortedMap(pivotColListTuple : _*)
//Pivoting the data on col5 by opening the grouped data
val pivotedRDD = pivotedAgg.map{groupedRow=>
    distinctCol5ValuesListMap = distinctCol5ValuesListMap.map(ft=> (ft._1,0.0))
     groupedRow._2.foreach{row=>
//Updating the distinctCol5ValuesListMap values to reflect the changes
//Change this part accordingly to what you want
       distinctCol5ValuesListMap = distinctCol5ValuesListMap.updated(row.getString(col5Index),
         distinctCol5ValuesListMap.getOrElse(row.getString(col5Index),0.0)+row.getDouble(col6Index))
     }        
    Row.fromSeq(Seq(groupedRow._1._1,groupedRow._1._2,groupedRow._1._3,groupedRow._1._4) ++ distinctCol5ValuesListMap.values.toSeq)
  }

//Consructing the structFields for new columns
val colTypesStruct = distinctCol5ValuesListMap.map(colName=>StructField(colName._1,DoubleType))
//Adding the first four column structFields with the new columns struct
val opStructType = StructType(Seq(StructField(col1Name,StringType),
                                     StructField(col2Name,DateType),
                                     StructField(col3Name,DateType),
                                     StructField(col4Name,StringType)) ++ colTypesStruct )

//Creating the final data frame
val pivotedDF = sqlContext.createDataFrame(pivotedRDD,opStructType)