PySpark 枢轴作为 SQL 查询

PySpark pivot as SQL query

希望编写 pyspark 中实现的枢轴的完整 SQL 等价物。下面的代码创建了一个 pandas DataFrame。

import pandas as pd
df = pd.DataFrame({
        'id': ['a','a','a','b','b','b','b','c','c'],
        'name': ['up','down','left','up','down','left','right','up','down'],
        'count': [6,7,5,3,4,2,9,12,4]})
#   id   name  count
# 0  a     up      6
# 1  a   down      7
# 2  a   left      5
# 3  b     up      3
# 4  b   down      4
# 5  b   left      2
# 6  b  right      9
# 7  c     up     12
# 8  c   down      4

下面的代码然后转换为 pyspark DataFrame 并在 name 列上实现一个数据透视表。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ds = spark.createDataFrame(df)
dp = ds.groupBy('id').pivot('name').max().toPandas()
#   id  down  left  right  up
# 0  c     4   NaN    NaN  12
# 1  b     4   2.0    9.0   3
# 2  a     7   5.0    NaN   6

尝试完全相当于 ds.groupBy('id').pivot('name').max()-SQL,即

ds.createOrReplaceTempView('ds')
dp = spark.sql(f"""
    SELECT * FROM ds
    PIVOT 
    (MAX(count)
     FOR
     ...)""").toPandas()

参考自SparkSQL Pivot -

枢轴

spark.sql(f"""
    SELECT * FROM ds
    PIVOT (
        MAX(count)
        FOR name in ('up','down','left','right')     
     )""").show()

+---+---+----+----+-----+
| id| up|down|left|right|
+---+---+----+----+-----+
|  c| 12|   4|null| null|
|  b|  3|   4|   2|    9|
|  a|  6|   7|   5| null|
+---+---+----+----+-----+

动态方法

您可以动态创建 PIVOT 子句,我尝试围绕相同的子句创建一个通用包装器


def pivot_by(inp_df,by):
    
    distinct_by = inp_df[by].unique()
    distinct_name_str = ''
    
    for i,name in enumerate(distinct_by):
        if i == 0:
            distinct_name_str += f'\'{name}\''
        else:
            distinct_name_str += f',\'{name}\''
    
    final_str = f'FOR {by} in ({distinct_name_str})'
    
    return final_str

pivot_clause_str = pivot_by(df,'name')

### O/p - FOR name in ('up','down','left','right')

spark.sql(f"""
    SELECT * FROM ds
    PIVOT (
        MAX(count)
        {pivot_clause_str}     
     )""").show()

+---+---+----+----+-----+
| id| up|down|left|right|
+---+---+----+----+-----+
|  c| 12|   4|null| null|
|  b|  3|   4|   2|    9|
|  a|  6|   7|   5| null|
+---+---+----+----+-----+

动态方法用法

pivot_clause_str = pivot_by(df,'id')

##O/p - FOR id in ('a','b','c')

ds.createOrReplaceTempView('ds')

sql.sql(f"""
    SELECT * FROM ds
    PIVOT (
        MAX(count)
        {pivot_clause_str}     
     )""").show()

+-----+----+---+----+
| name|   a|  b|   c|
+-----+----+---+----+
| down|   7|  4|   4|
| left|   5|  2|null|
|   up|   6|  3|  12|
|right|null|  9|null|
+-----+----+---+----+