pyspark 将行转换为列
pyspark convert rows to columns
我有一个数据框,我需要将同一组的行转换为列。基本上枢转这些。下面是我的 df.
+------------+-------+-----+-------+
|Customer |ID |unit |order |
+------------+-------+-----+-------+
|John |123 |00015|1 |
|John |123 |00016|2 |
|John |345 |00205|3 |
|John |345 |00206|4 |
|John |789 |00283|5 |
|John |789 |00284|6 |
+------------+-------+-----+-------+
我需要上述结果数据作为..
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state | ID_1 | unit_1 |seq_num_1 | ID_2 | unit_2 | seq_num_2 | ID_3 |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John | 123 | 00015 | 1 | 345 | 00205 | 3 | 789 |00283 | 5 |
|John | 123 | 00016 | 2 | 345 | 00206 | 4 | 789 |00284 | 6 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
我尝试使用 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值。有没有什么方法可以在不使用 pivot() 函数的情况下获得结果……非常感谢任何帮助。
谢谢。
可能有多种方法可以做到这一点,但 pandas udf 可以是其中一种方法。这是一个基于您的数据的玩具示例:
df = pd.DataFrame({'Customer': ['John']*6,
'ID': [123]*2 + [345]*2 + [789]*2,
'unit': ['00015', '00016', '00205', '00206', '00283', '00284'],
'order': range(1, 7)})
sdf = spark.createDataFrame(df)
# Spark 2.4 syntax. Spark 3.0 is less verbose
return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
groups = pdf.groupby('ID')
out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
return out
sdf.groupby('Customer').apply(convert_to_wide).show()
+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
+-----+----+------+---------+----+------+---------+----+------+---------+
这看起来像是使用 dense_rank() 聚合函数创建不同的通用序列(下面代码中的 dr
)的典型案例IDs在每组Customer下,然后在这个序列上进行旋转。我们可以使用 row_number() 做类似于 order
列的操作,以便它可以在 groupby:
中使用
from pyspark.sql import Window, functions as F
# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
添加两个 Window 规范:w1
得到 dense_rank() of ID通过 Customer 和 w2
获得 row_number() of order在相同的 Customer 和 ID 下。
w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')
在以上两个WinSpecs的基础上新增两个列:dr
(dense_rank)和sid
(row_number)
df1 = df.select(
"*",
F.dense_rank().over(w1).alias('dr'),
F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
| John|123|00015| 1| 1| 1|
| John|123|00016| 2| 1| 2|
| John|345|00205| 3| 2| 1|
| John|345|00206| 4| 2| 2|
| John|789|00283| 5| 3| 1|
| John|789|00284| 6| 3| 2|
| John|789|00285| 7| 3| 3|
+--------+---+-----+-----+---+---+
找到max(dr)
,这样我们就可以预先定义要以range(1,N+1)
为基准的列表(这样可以提高pivot
方法的效率)。
N = df1.agg(F.max('dr')).first()[0]
Groupby Customer
、sid
并以 dr
为中心,然后进行聚合:
df_new = df1.groupby('Customer','sid') \
.pivot('dr', range(1,N+1)) \
.agg(
F.first('ID').alias('ID'),
F.first('unit').alias('unit'),
F.first('order').alias('order')
)
df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
如果需要重命名列名:
import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
下面是我的解决方案.. 进行排名然后拉平结果。
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)
我有一个数据框,我需要将同一组的行转换为列。基本上枢转这些。下面是我的 df.
+------------+-------+-----+-------+
|Customer |ID |unit |order |
+------------+-------+-----+-------+
|John |123 |00015|1 |
|John |123 |00016|2 |
|John |345 |00205|3 |
|John |345 |00206|4 |
|John |789 |00283|5 |
|John |789 |00284|6 |
+------------+-------+-----+-------+
我需要上述结果数据作为..
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state | ID_1 | unit_1 |seq_num_1 | ID_2 | unit_2 | seq_num_2 | ID_3 |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John | 123 | 00015 | 1 | 345 | 00205 | 3 | 789 |00283 | 5 |
|John | 123 | 00016 | 2 | 345 | 00206 | 4 | 789 |00284 | 6 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
我尝试使用 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值。有没有什么方法可以在不使用 pivot() 函数的情况下获得结果……非常感谢任何帮助。 谢谢。
可能有多种方法可以做到这一点,但 pandas udf 可以是其中一种方法。这是一个基于您的数据的玩具示例:
df = pd.DataFrame({'Customer': ['John']*6,
'ID': [123]*2 + [345]*2 + [789]*2,
'unit': ['00015', '00016', '00205', '00206', '00283', '00284'],
'order': range(1, 7)})
sdf = spark.createDataFrame(df)
# Spark 2.4 syntax. Spark 3.0 is less verbose
return_types = 'state string, ID_1 int, unit_1 string, seq_num_1 int, ID_2int, unit_2 string, seq_num_2 int, ID_3 int, unit_3 string, seq_num_3 int'
@pandas_udf(returnType=return_types, functionType=PandasUDFType.GROUPED_MAP)
def convert_to_wide(pdf):
groups = pdf.groupby('ID')
out = pd.concat([group.set_index('Customer') for _, group in groups], axis=1).reset_index()
out.columns = ['state', 'ID_1', 'unit_1', 'seq_num_1', 'ID_2', 'unit_2', 'seq_num_2', 'ID_3', 'unit_3', 'seq_num_3']
return out
sdf.groupby('Customer').apply(convert_to_wide).show()
+-----+----+------+---------+----+------+---------+----+------+---------+
|state|ID_1|unit_1|seq_num_1|ID_2|unit_2|seq_num_2|ID_3|unit_3|seq_num_3|
+-----+----+------+---------+----+------+---------+----+------+---------+
| John| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
+-----+----+------+---------+----+------+---------+----+------+---------+
这看起来像是使用 dense_rank() 聚合函数创建不同的通用序列(下面代码中的 dr
)的典型案例IDs在每组Customer下,然后在这个序列上进行旋转。我们可以使用 row_number() 做类似于 order
列的操作,以便它可以在 groupby:
from pyspark.sql import Window, functions as F
# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
添加两个 Window 规范:w1
得到 dense_rank() of ID通过 Customer 和 w2
获得 row_number() of order在相同的 Customer 和 ID 下。
w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')
在以上两个WinSpecs的基础上新增两个列:dr
(dense_rank)和sid
(row_number)
df1 = df.select(
"*",
F.dense_rank().over(w1).alias('dr'),
F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
| John|123|00015| 1| 1| 1|
| John|123|00016| 2| 1| 2|
| John|345|00205| 3| 2| 1|
| John|345|00206| 4| 2| 2|
| John|789|00283| 5| 3| 1|
| John|789|00284| 6| 3| 2|
| John|789|00285| 7| 3| 3|
+--------+---+-----+-----+---+---+
找到max(dr)
,这样我们就可以预先定义要以range(1,N+1)
为基准的列表(这样可以提高pivot
方法的效率)。
N = df1.agg(F.max('dr')).first()[0]
Groupby Customer
、sid
并以 dr
为中心,然后进行聚合:
df_new = df1.groupby('Customer','sid') \
.pivot('dr', range(1,N+1)) \
.agg(
F.first('ID').alias('ID'),
F.first('unit').alias('unit'),
F.first('order').alias('order')
)
df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
如果需要重命名列名:
import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
下面是我的解决方案.. 进行排名然后拉平结果。
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
rankedDF = df.withColumn("rank", row_number().over(Window.partitionBy("customer").orderBy("order")))
w1 = Window.partitionBy("customer").orderBy("order")
groupedDF = rankedDF.select("customer", "rank", collect_list("ID").over(w1).alias("ID"), collect_list("unit").over(w1).alias("unit"), collect_list("order").over(w1).alias("seq_num")).groupBy("customer", "rank").agg(max("ID").alias("ID"), max("unit").alias("unit"), max("seq_num").alias("seq_num") )
groupedColumns = [col("customer")]
pivotColumns = map(lambda i:map(lambda a:col(a)[i-1].alias(a + "_" + `i`), ["ID", "unit", "seq_num"]), [1,2,3])
flattenedCols = [item for sublist in pivotColumns for item in sublist]
finalDf=groupedDF.select(groupedColumns + flattenedCols)