pyspark 将行转换为列

pyspark convert rows to columns

我有一个数据框,我需要将同一组的行转换为列。基本上枢转这些。下面是我的 df.

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

我需要上述结果数据作为..

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

我尝试使用 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值。有没有什么方法可以在不使用 pivot() 函数的情况下获得结果……非常感谢任何帮助。 谢谢。

可能有多种方法可以做到这一点,但 pandas udf 可以是其中一种方法。这是一个基于您的数据的玩具示例:

df = pd.DataFrame({'Customer': ['John']*6, 
                   'ID': [123]*2 + [345]*2 + [789]*2, 
                   'unit': ['00015', '00016', '00205', '00206', '00283', '00284'], 
                   'order': range(1, 7)})
sdf = spark.createDataFrame(df)

# Spark 2.4 syntax. Spark 3.0 is less verbose
return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
    groups = pdf.groupby('ID')
    out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
    out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
    return out

sdf.groupby('Customer').apply(convert_to_wide).show()

+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015|        1| 345| 00205|        3| 789| 00283|        5|
| John| 123| 00016|        2| 345| 00206|        4| 789| 00284|        6|
+-----+----+------+---------+----+------+---------+----+------+---------+

这看起来像是使用 dense_rank() 聚合函数创建不同的通用序列(下面代码中的 dr)的典型案例IDs在每组Customer下,然后在这个序列上进行旋转。我们可以使用 row_number() 做类似于 order 列的操作,以便它可以在 groupby:

中使用
from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

添加两个 Window 规范:w1 得到 dense_rank() of ID通过 Customerw2 获得 row_number() of order在相同的 CustomerID 下。

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

在以上两个WinSpecs的基础上新增两个列:dr(dense_rank)和sid(row_number)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

找到max(dr),这样我们就可以预先定义要以range(1,N+1)为基准的列表(这样可以提高pivot方法的效率)。

N = df1.agg(F.max('dr')).first()[0]

Groupby Customersid 并以 dr 为中心,然后进行聚合:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

如果需要重命名列名:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

下面是我的解决方案.. 进行排名然后拉平结果。

df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )    
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)