在 Spark SQL 中将列转换为行
Convert columns to rows in Spark SQL
我有一些这样的数据:
ID
Value1
Value2
Value40
101
3
520
2001
102
29
530
2020
我想获取此数据并将其转换为 KV 样式对
ID
ValueVv
ValueDesc
101
3
Value1
101
520
Value2
101
2001
Value40
我认为这是一个枢轴,但我想不出它在代码中需要是什么样子。
我正在尝试在 PySQL 中求解,但也在 Python DataFrame 中求解,因为我正在使用 Spark。
我可以很容易地使用 SQL 将每一列联合到一个输出中,但我希望有更有效的方法吗?
我已经将 melt
视为一个选项,并且 stack
。但我不确定如何有效地做到这一点。
您可以按如下方式使用flatmap
:
val schema = df.schema
val df2 = df.flatMap(row => {
val id = row.getString(0)
(1 until row.size).map(i => {
(id, schema(i).name, row.getString(i))
})
}).toDF("ID", "ValueVv", "ValueDesc")
df2.show()
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| Value1| 3|
|101| Value2| 520|
|101|Value40| 2001|
|102| Value1| 29|
|102| Value2| 530|
|102|Value40| 2020|
+---+-------+---------+
或 stack
来自此 的函数。
到目前为止,我已经这样做了,但我不想使用 pandas.. 而是仅使用 spark 数据框来实现它。
import pandas as pd
def main():
data={'AnID':[2001,2002,2003,2004],
'Name':['adam','jane','Sarah','Ryan'],
'Age':[23,22,21,24],
'Age1':[24,52,51,264],
'Age2':[263,262,261,264]}
df=pd.DataFrame(data)
#Iterate the DataFrame so that we can pivot the "columns" into Rows
schema = df.columns #gives me the names of the columns
df.index[0]
#loop Through the id to pivot on (assume it's the first one in the df)
j = 0
df2=pd.DataFrame()
while j < schema.size:
curvalid = schema[j]
idname = schema[j] #get each element of the array
vval = df[idname].values #Grab all the values for the given "column"
#then get the data for that array element and populate a new object
i = 0
while i < vval.size:
df3=pd.DataFrame({'DemoDesc' : curvalid, 'DemoID' : vval[i]}, index=[i])
df2 = df2.append(df3,ignore_index=True)
i = i + 1;
j = j + 1;
print(df2) #print the dataframe
return;
main()
它与 pivot 相反 - 它被称为 unpivot。
在 Spark 中,unpivoting 是使用 stack
函数实现的。
使用 PySpark,如果您没有很多列,您可以这样做:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(101, 3, 520, 2001),
(102, 29, 530, 2020)],
['ID', 'Value1', 'Value2', 'Value40'])
df = df.select(
"ID",
F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")
)
从您的示例中我看到您可能有很多列。在这种情况下,您可以使用这样的东西:
cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
"ID",
F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")
)
两个版本的示例数据return
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| 3| Value1|
|101| 520| Value2|
|101| 2001| Value40|
|102| 29| Value1|
|102| 530| Value2|
|102| 2020| Value40|
+---+-------+---------+
我有一些这样的数据:
ID | Value1 | Value2 | Value40 |
---|---|---|---|
101 | 3 | 520 | 2001 |
102 | 29 | 530 | 2020 |
我想获取此数据并将其转换为 KV 样式对
ID | ValueVv | ValueDesc |
---|---|---|
101 | 3 | Value1 |
101 | 520 | Value2 |
101 | 2001 | Value40 |
我认为这是一个枢轴,但我想不出它在代码中需要是什么样子。
我正在尝试在 PySQL 中求解,但也在 Python DataFrame 中求解,因为我正在使用 Spark。
我可以很容易地使用 SQL 将每一列联合到一个输出中,但我希望有更有效的方法吗?
我已经将 melt
视为一个选项,并且 stack
。但我不确定如何有效地做到这一点。
您可以按如下方式使用flatmap
:
val schema = df.schema
val df2 = df.flatMap(row => {
val id = row.getString(0)
(1 until row.size).map(i => {
(id, schema(i).name, row.getString(i))
})
}).toDF("ID", "ValueVv", "ValueDesc")
df2.show()
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| Value1| 3|
|101| Value2| 520|
|101|Value40| 2001|
|102| Value1| 29|
|102| Value2| 530|
|102|Value40| 2020|
+---+-------+---------+
或 stack
来自此
到目前为止,我已经这样做了,但我不想使用 pandas.. 而是仅使用 spark 数据框来实现它。
import pandas as pd
def main():
data={'AnID':[2001,2002,2003,2004],
'Name':['adam','jane','Sarah','Ryan'],
'Age':[23,22,21,24],
'Age1':[24,52,51,264],
'Age2':[263,262,261,264]}
df=pd.DataFrame(data)
#Iterate the DataFrame so that we can pivot the "columns" into Rows
schema = df.columns #gives me the names of the columns
df.index[0]
#loop Through the id to pivot on (assume it's the first one in the df)
j = 0
df2=pd.DataFrame()
while j < schema.size:
curvalid = schema[j]
idname = schema[j] #get each element of the array
vval = df[idname].values #Grab all the values for the given "column"
#then get the data for that array element and populate a new object
i = 0
while i < vval.size:
df3=pd.DataFrame({'DemoDesc' : curvalid, 'DemoID' : vval[i]}, index=[i])
df2 = df2.append(df3,ignore_index=True)
i = i + 1;
j = j + 1;
print(df2) #print the dataframe
return;
main()
它与 pivot 相反 - 它被称为 unpivot。
在 Spark 中,unpivoting 是使用 stack
函数实现的。
使用 PySpark,如果您没有很多列,您可以这样做:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(101, 3, 520, 2001),
(102, 29, 530, 2020)],
['ID', 'Value1', 'Value2', 'Value40'])
df = df.select(
"ID",
F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")
)
从您的示例中我看到您可能有很多列。在这种情况下,您可以使用这样的东西:
cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
"ID",
F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")
)
两个版本的示例数据return
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| 3| Value1|
|101| 520| Value2|
|101| 2001| Value40|
|102| 29| Value1|
|102| 530| Value2|
|102| 2020| Value40|
+---+-------+---------+