如何创建按列名旋转的pyspark数据框

How to create pyspark dataframe pivoting by column names

我有一个数据框

+--------+--------------------+
|      id|                 col|
+--------+--------------------+
|11111111|[{"orderId":11111...|
+--------+--------------------+
|22222222|[{"orderId":22222...|
+--------+--------------------+

JSON 格式的列:

[
                {
                    "orderId": 11111,
                    "type": "111",
                    "rate":111,
                    "sum": 111
                },
                {
                    "orderId": 11111,
                    "type": "222",
                    "rate":222,
                    "sum": 222
                }
]

我想按id分组,但我不知道如何旋转这个DF。我想看到的最终结果:

+--------+-------+-----+-----+-----+
|      id|orderId| type| rate| sum |
+--------+-------+-----+-----+-----+
|11111111|  11111|"111"|  111|  111|
+--------+-------+-----+-----+-----+
|11111111|  11111|"222"|  222|  222|    
+--------+-------+-----+-----+-----+
|22222222|  22222|"222"|  222|  222|    
+--------+-------+-----+-----+-----+

好像不是旋转而是提取 做就是了 Df.select('id', 'col.*')

我尝试了以下方法并且有效。不确定这是否是最好的方法,我愿意接受建议和改进。

Input_df

from pyspark.sql.functions import *
from pyspark.sql.types import *

data = [(11111111, """[{"orderId": 11111,"type": "111","rate":111,"sum": 111} , {"orderId": 11111,"type": "222","rate":222,"sum": 222}]"""), (22222222, """[{"orderId": 22222,"type": "222","rate":222,"sum": 222}]""")]

schema = StructType( [ StructField("id", IntegerType()), StructField("col", StringType()) ] )

df = spark.createDataFrame(data=data,schema=schema)
df.show(truncate=False)

+--------+-----------------------------------------------------------------------------------------------------------------+
|id      |col                                                                                                              |
+--------+-----------------------------------------------------------------------------------------------------------------+
|11111111|[{"orderId": 11111,"type": "111","rate":111,"sum": 111} , {"orderId": 11111,"type": "222","rate":222,"sum": 222}]|
|22222222|[{"orderId": 22222,"type": "222","rate":222,"sum": 222}]                                                         |
+--------+-----------------------------------------------------------------------------------------------------------------+
# Splitting list of dictionaries into rows

df2 = df.withColumn("data",explode(split(regexp_replace(col("col"), "(^\[)|(\]$)", ""), ", "))).withColumn("data",explode(split('data','},'))).withColumn("data",explode(split(regexp_replace(col("data"), "(^\{)|(\}$)", ""), ", "))).withColumn("data_new", regexp_replace(col("data"), '}', ''))

df2 = df2.drop(col("col")).drop(col("data"))
df2.show(truncate=False)

+--------+-----------------------------------------------------+
|id      |data_new                                             |
+--------+-----------------------------------------------------+
|11111111|"orderId": 11111,"type": "111","rate":111,"sum": 111 |
|11111111|"orderId": 11111,"type": "222","rate":222,"sum": 222 |
|22222222|"orderId": 22222,"type": "222","rate":222,"sum": 222 |
+--------+-----------------------------------------------------+
# Getting col_1 value from the seperated dictionary key, value pairs

col_1 = split(df2['data_new'],',')
df3 = df2.withColumn('col_1', col_1.getItem(0)).withColumn("col_2", col_1.getItem(1)).withColumn("col_3", col_1.getItem(3)).drop("data_new")
df3.show(truncate=False)

+--------+----------------+-------------+-----------+
|id      |col_1           |col_2        |col_3      |
+--------+----------------+-------------+-----------+
|11111111|"orderId": 11111|"type": "111"|"sum": 111 |
|11111111|"orderId": 11111|"type": "222"|"sum": 222 |
|22222222|"orderId": 22222|"type": "222"|"sum": 222 |
+--------+----------------+-------------+-----------+
# Splitting on ':' and getting value at index 1 for each of the 3 columns - orderId, type, sum

col_1 = split(df3['col_1'],':')
col_2 = split(df3['col_2'], ':')
col_3 = split(df3['col_3'], ':')

df4 =df3.withColumn('orderId',col_1.getItem(1)).drop(df3.col_1).withColumn("type", col_2.getItem(1)).drop(df3.col_2).withColumn("sum", col_3.getItem(1)).drop(df3.col_3)
df4.show(truncate=False)

+--------+-------+------+-----+
|id      |orderId|type  |sum  |
+--------+-------+------+-----+
|11111111| 11111 | "111"| 111 |
|11111111| 11111 | "222"| 222 |
|22222222| 22222 | "222"| 222 |
+--------+-------+------+-----+