在 Spark SQL 中将列转换为行

Convert columns to rows in Spark SQL


ID Value1 Value2 Value40
101 3 520 2001
102 29 530 2020

我想获取此数据并将其转换为 KV 样式对

ID ValueVv ValueDesc
101 3 Value1
101 520 Value2
101 2001 Value40


我正在尝试在 PySQL 中求解,但也在 Python DataFrame 中求解,因为我正在使用 Spark。

我可以很容易地使用 SQL 将每一列联合到一个输出中,但我希望有更有效的方法吗?

我已经将 melt 视为一个选项,并且 stack。但我不确定如何有效地做到这一点。


val schema = df.schema
val df2 = df.flatMap(row => {
    val id = row.getString(0)
    (1 until row.size).map(i => {
        (id, schema(i).name, row.getString(i))
}).toDF("ID", "ValueVv", "ValueDesc")

| ID|ValueVv|ValueDesc|
|101| Value1|        3|
|101| Value2|      520|
|101|Value40|     2001|
|102| Value1|       29|
|102| Value2|      530|
|102|Value40|     2020|

stack 来自此 的函数。

到目前为止,我已经这样做了,但我不想使用 pandas.. 而是仅使用 spark 数据框来实现它。

import pandas as pd
def main():


   #Iterate the DataFrame so that we can pivot the "columns" into Rows
    schema = df.columns  #gives me the names of the columns
    #loop Through the id to pivot on (assume it's the first one in the df)
    j = 0
    while j < schema.size:
        curvalid = schema[j]
        idname = schema[j] #get each element of the array
        vval = df[idname].values  #Grab all the values for the given "column"
    #then get the data for that array element and populate a new object

        i = 0
        while i < vval.size:
            df3=pd.DataFrame({'DemoDesc' : curvalid, 'DemoID' : vval[i]}, index=[i])
            df2 = df2.append(df3,ignore_index=True)
            i = i + 1;
        j = j + 1;
    print(df2) #print the dataframe

它与 pivot 相反 - 它被称为 unpivot
在 Spark 中,unpivoting 是使用 stack 函数实现的。

使用 PySpark,如果您没有很多列,您可以这样做:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
    [(101, 3, 520, 2001),
     (102, 29, 530, 2020)],
    ['ID', 'Value1', 'Value2', 'Value40'])

df = df.select(
    F.expr("stack(3, Value1, 'Value1', Value2, 'Value2', Value40, 'Value40') as (ValueVv, ValueDesc)")


cols_to_unpivot = [f"`{c}`, \'{c}\'" for c in df.columns if c != 'ID']
stack_string = ", ".join(cols_to_unpivot)
df = df.select(
    F.expr(f"stack({len(cols_to_unpivot)}, {stack_string}) as (ValueVv, ValueDesc)")


| ID|ValueVv|ValueDesc|
|101|      3|   Value1|
|101|    520|   Value2|
|101|   2001|  Value40|
|102|     29|   Value1|
|102|    530|   Value2|
|102|   2020|  Value40|