如何取消旋转大型 spark 数据框?

How to unpivot a large spark dataframe?

当列数相当少并且列名可以硬编码时,我已经看到了一些针对 unpivot spark 数据框的解决方案。您是否有可扩展的解决方案来反透视具有大量列的数据框?

下面是一道玩具题

输入:

  val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

+---+--------+----+
| ID|  A | B | C  |
+---+--------+-----
|  1|  1 | 1 | 0  |
|  2|  0 | 0 | 1  |
+---+----------+--+

预期结果:

+---+-----+-----+
| ID|names|count|
+---+-----------|
|  1|  A  |  1  |
|  1|  B  |  1  |
|  1|  C  |  0  |
|  2|  A  |  0  |
|  2|  B  |  0  |
|  2|  C  |  1  |
+---+-----------+

该解决方案应该适用于有 N 列要反透视的数据集,其中 N 很大(比如 100 列)。

这应该有效,我假设您知道要取消透视的列的列表

import org.apache.spark.sql.{functions => func, _}

val df = Seq(
    (1,1,1,0),
    (2,0,0,1)    
  ).toDF("ID","A","B","C")

val cols = Seq("A", "B", "C")

df.select(
    $"ID",
    func.explode(
        func.array(
            cols.map(
                col =>
                    func.struct(    
                        func.lit(col).alias("names"),
                        func.col(col).alias("count")
                    )
            ): _*
        )
    ).alias("v")
)
.selectExpr("ID", "v.*")

这可以在纯 spark Sql 中通过堆叠列来完成。

下面是 pyspark 中的一个示例,可以很容易地适应 Scala。 python 代码仅与根据相关字段动态构建 Sql 相关。我经常使用这种方法。

from pyspark.sql.types import * 

df = spark.createDataFrame([
  {"id" : 1, "A" : 1, "B" : 1, "C" : 0},
  {"id" : 2, "A" : 0, "B" : 0, "C" : 1}
],
StructType([StructField("id", IntegerType()), StructField("A", IntegerType()),StructField("B", IntegerType()) , StructField("C", IntegerType())]))

def features_to_eav(df, subset=None):

  relevant_columns = subset if subset else df.columns
  n = len(relevant_columns)
  cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns]) 
  stack_expression = "stack({}, {}) as (name, value)".format(n, cols_to_stack)

  fetures_to_check_df = df.select(*(["id"] + relevant_columns)).createOrReplaceTempView("features_to_check")

  sql = "select id, {} from features_to_check".format(stack_expression)
  print ("Stacking sql:", sql)
  return spark.sql(sql)

features_to_eav(df, ["A", "B", "C"]).show()

输出(注意构造sql):

Stacking sql: select id, stack(3, 'A', A, 'B', B, 'C', C) as (name, value) from features_to_check
+---+----+-----+
| id|name|value|
+---+----+-----+
|  1|   A|    1|
|  1|   B|    1|
|  1|   C|    0|
|  2|   A|    0|
|  2|   B|    0|
|  2|   C|    1|
+---+----+-----+