在 Spark SQL 中将列转换为行

Convert columns to rows in Spark SQL

我有一些这样的数据:

ID Value1 Value2 Value40
101 3 520 2001
102 29 530 2020

我想获取此数据并将其转换为 KV 样式对

ID ValueVv ValueDesc
101 3 Value1
101 520 Value2
101 2001 Value40

我认为这是一个枢轴,但我想不出它在代码中需要是什么样子。

我正在尝试在 PySQL 中求解,但也在 Python DataFrame 中求解,因为我正在使用 Spark。

我可以很容易地使用 SQL 将每一列联合到一个输出中,但我希望有更有效的方法吗?

我已经将 melt 视为一个选项,并且 stack。但我不确定如何有效地做到这一点。

您可以按如下方式使用flatmap

val schema = df.schema
val df2 = df.flatMap(row => {
    val id = row.getString(0)
    (1 until row.size).map(i => {
        (id, schema(i).name, row.getString(i))
    })
}).toDF("ID", "ValueVv", "ValueDesc")

df2.show()
+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101| Value1|        3|
|101| Value2|      520|
|101|Value40|     2001|
|102| Value1|       29|
|102| Value2|      530|
|102|Value40|     2020|
+---+-------+---------+

stack 来自此 的函数。

到目前为止,我已经这样做了,但我不想使用 pandas.. 而是仅使用 spark 数据框来实现它。

import pandas as pd
    
def main():
    
    data={'AnID':[2001,2002,2003,2004],
          'Name':['adam','jane','Sarah','Ryan'], 
          'Age':[23,22,21,24], 
          'Age1':[24,52,51,264], 
          'Age2':[263,262,261,264]}

    df=pd.DataFrame(data)

   #Iterate the DataFrame so that we can pivot the "columns" into Rows
    schema = df.columns  #gives me the names of the columns
       
    df.index[0]
    
    #loop Through the id to pivot on (assume it's the first one in the df)
    
    j = 0
    df2=pd.DataFrame()
    while j < schema.size:
        curvalid = schema[j]
        idname = schema[j] #get each element of the array
        vval = df[idname].values  #Grab all the values for the given "column"
 
    #then get the data for that array element and populate a new object
    
        

        i = 0
        while i < vval.size:
            df3=pd.DataFrame({'DemoDesc' : curvalid, 'DemoID' : vval[i]}, index=[i])
            df2 = df2.append(df3,ignore_index=True)
            i = i + 1;
        j = j + 1;
    print(df2) #print the dataframe
    return;
    
main()

它与 pivot 相反 - 它被称为 unpivot
在 Spark 中,unpivoting 是使用 stack 函数实现的。

使用 PySpark,如果您没有很多列,您可以这样做:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [(101, 3, 520, 2001),
     (102, 29, 530, 2020)],
    ['ID', 'Value1', 'Value2', 'Value40'])

df = df.select(
    "ID",
    F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")
)

从您的示例中我看到您可能有很多列。在这种情况下,您可以使用这样的东西:

cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
    "ID",
    F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")
)

两个版本的示例数据return

+---+-------+---------+
| ID|ValueVv|ValueDesc|
+---+-------+---------+
|101|      3|   Value1|
|101|    520|   Value2|
|101|   2001|  Value40|
|102|     29|   Value1|
|102|    530|   Value2|
|102|   2020|  Value40|
+---+-------+---------+