如何取消旋转大型 spark 数据框?
How to unpivot a large spark dataframe?
当列数相当少并且列名可以硬编码时,我已经看到了一些针对 unpivot
spark 数据框的解决方案。您是否有可扩展的解决方案来反透视具有大量列的数据框?
下面是一道玩具题
输入:
val df = Seq(
(1,1,1,0),
(2,0,0,1)
).toDF("ID","A","B","C")
+---+--------+----+
| ID| A | B | C |
+---+--------+-----
| 1| 1 | 1 | 0 |
| 2| 0 | 0 | 1 |
+---+----------+--+
预期结果:
+---+-----+-----+
| ID|names|count|
+---+-----------|
| 1| A | 1 |
| 1| B | 1 |
| 1| C | 0 |
| 2| A | 0 |
| 2| B | 0 |
| 2| C | 1 |
+---+-----------+
该解决方案应该适用于有 N 列要反透视的数据集,其中 N 很大(比如 100 列)。
这应该有效,我假设您知道要取消透视的列的列表
import org.apache.spark.sql.{functions => func, _}
val df = Seq(
(1,1,1,0),
(2,0,0,1)
).toDF("ID","A","B","C")
val cols = Seq("A", "B", "C")
df.select(
$"ID",
func.explode(
func.array(
cols.map(
col =>
func.struct(
func.lit(col).alias("names"),
func.col(col).alias("count")
)
): _*
)
).alias("v")
)
.selectExpr("ID", "v.*")
这可以在纯 spark Sql 中通过堆叠列来完成。
下面是 pyspark 中的一个示例,可以很容易地适应 Scala。 python 代码仅与根据相关字段动态构建 Sql 相关。我经常使用这种方法。
from pyspark.sql.types import *
df = spark.createDataFrame([
{"id" : 1, "A" : 1, "B" : 1, "C" : 0},
{"id" : 2, "A" : 0, "B" : 0, "C" : 1}
],
StructType([StructField("id", IntegerType()), StructField("A", IntegerType()),StructField("B", IntegerType()) , StructField("C", IntegerType())]))
def features_to_eav(df, subset=None):
relevant_columns = subset if subset else df.columns
n = len(relevant_columns)
cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns])
stack_expression = "stack({}, {}) as (name, value)".format(n, cols_to_stack)
fetures_to_check_df = df.select(*(["id"] + relevant_columns)).createOrReplaceTempView("features_to_check")
sql = "select id, {} from features_to_check".format(stack_expression)
print ("Stacking sql:", sql)
return spark.sql(sql)
features_to_eav(df, ["A", "B", "C"]).show()
输出(注意构造sql):
Stacking sql: select id, stack(3, 'A', A, 'B', B, 'C', C) as (name, value) from features_to_check
+---+----+-----+
| id|name|value|
+---+----+-----+
| 1| A| 1|
| 1| B| 1|
| 1| C| 0|
| 2| A| 0|
| 2| B| 0|
| 2| C| 1|
+---+----+-----+
当列数相当少并且列名可以硬编码时,我已经看到了一些针对 unpivot
spark 数据框的解决方案。您是否有可扩展的解决方案来反透视具有大量列的数据框?
下面是一道玩具题
输入:
val df = Seq(
(1,1,1,0),
(2,0,0,1)
).toDF("ID","A","B","C")
+---+--------+----+
| ID| A | B | C |
+---+--------+-----
| 1| 1 | 1 | 0 |
| 2| 0 | 0 | 1 |
+---+----------+--+
预期结果:
+---+-----+-----+
| ID|names|count|
+---+-----------|
| 1| A | 1 |
| 1| B | 1 |
| 1| C | 0 |
| 2| A | 0 |
| 2| B | 0 |
| 2| C | 1 |
+---+-----------+
该解决方案应该适用于有 N 列要反透视的数据集,其中 N 很大(比如 100 列)。
这应该有效,我假设您知道要取消透视的列的列表
import org.apache.spark.sql.{functions => func, _}
val df = Seq(
(1,1,1,0),
(2,0,0,1)
).toDF("ID","A","B","C")
val cols = Seq("A", "B", "C")
df.select(
$"ID",
func.explode(
func.array(
cols.map(
col =>
func.struct(
func.lit(col).alias("names"),
func.col(col).alias("count")
)
): _*
)
).alias("v")
)
.selectExpr("ID", "v.*")
这可以在纯 spark Sql 中通过堆叠列来完成。
下面是 pyspark 中的一个示例,可以很容易地适应 Scala。 python 代码仅与根据相关字段动态构建 Sql 相关。我经常使用这种方法。
from pyspark.sql.types import *
df = spark.createDataFrame([
{"id" : 1, "A" : 1, "B" : 1, "C" : 0},
{"id" : 2, "A" : 0, "B" : 0, "C" : 1}
],
StructType([StructField("id", IntegerType()), StructField("A", IntegerType()),StructField("B", IntegerType()) , StructField("C", IntegerType())]))
def features_to_eav(df, subset=None):
relevant_columns = subset if subset else df.columns
n = len(relevant_columns)
cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns])
stack_expression = "stack({}, {}) as (name, value)".format(n, cols_to_stack)
fetures_to_check_df = df.select(*(["id"] + relevant_columns)).createOrReplaceTempView("features_to_check")
sql = "select id, {} from features_to_check".format(stack_expression)
print ("Stacking sql:", sql)
return spark.sql(sql)
features_to_eav(df, ["A", "B", "C"]).show()
输出(注意构造sql):
Stacking sql: select id, stack(3, 'A', A, 'B', B, 'C', C) as (name, value) from features_to_check
+---+----+-----+
| id|name|value|
+---+----+-----+
| 1| A| 1|
| 1| B| 1|
| 1| C| 0|
| 2| A| 0|
| 2| B| 0|
| 2| C| 1|
+---+----+-----+