如何在 pyspark 中将单个列动态转置为多个行?
How to dynamically transpose a single Column to multiple Rows in pyspark?
我有一个如下所示的数据框
ColName
a
b
c
d
e
f
g
h
i
j
k
l
并且基于我想将这些值转置到行中的特定参数。因此,例如,如果参数值为 3,则新数据框将如下所示
Col1
Col2
Col3
a
b
c
d
e
f
g
h
i
j
k
l
但是如果参数值为4,则如下所示
Col1
Col2
Col3
Col4
a
b
c
d
e
f
g
h
i
j
k
l
需要注意的几点:
- 列名不重要
- 单列中的项目数和参数都可以更改
知道如何在 pyspark 中实现这个吗?提前致谢。
您可以添加一些辅助列来旋转数据框:
import pyspark.sql.functions as F
x = 3
result = df.withColumn(
'id',
F.monotonically_increasing_id()
).withColumn(
'id2',
(F.col('id') / x).cast('int')
).withColumn(
'id3',
F.col('id') % x
).groupBy('id2').pivot('id3').agg(F.first('ColName')).orderBy('id2').drop('id2')
result.show()
+---+---+---+
| 0| 1| 2|
+---+---+---+
| a| b| c|
| d| e| f|
| g| h| i|
| j| k| l|
+---+---+---+
您可以结合使用 collect_list
和 row_number
来实现此目的。
步骤 1: 生成自定义 row_number。
from pyspark.sql.functions import floor, row_number, collect_list
from pyspark.sql.window import Window
no_of_columns = 3
df2 = df.withColumn("row_num", floor((row_number().over(Window.orderBy("ColName"))-1)/no_of_columns))
第 2 步: 使用此 row_number 对数据进行分组,然后使用 collect_list
创建列表。
df3 = df2.groupBy("row_num").agg(collect_list("ColName").alias("col_list"))
第 3 步: 使用 python 的列表理解来 select 此列表中的所有元素。
df3.select(*[df3.col_list[i].alias(f"col{i+1}") for i in range(no_of_columns)]).show()
输出:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| c|
| d| e| f|
| g| h| i|
| j| k| l|
+----+----+----+
注意:参数no_of_columns
可以根据需要的输出列数进行更改。
我有一个如下所示的数据框
ColName |
---|
a |
b |
c |
d |
e |
f |
g |
h |
i |
j |
k |
l |
并且基于我想将这些值转置到行中的特定参数。因此,例如,如果参数值为 3,则新数据框将如下所示
Col1 | Col2 | Col3 |
---|---|---|
a | b | c |
d | e | f |
g | h | i |
j | k | l |
但是如果参数值为4,则如下所示
Col1 | Col2 | Col3 | Col4 |
---|---|---|---|
a | b | c | d |
e | f | g | h |
i | j | k | l |
需要注意的几点:
- 列名不重要
- 单列中的项目数和参数都可以更改
知道如何在 pyspark 中实现这个吗?提前致谢。
您可以添加一些辅助列来旋转数据框:
import pyspark.sql.functions as F
x = 3
result = df.withColumn(
'id',
F.monotonically_increasing_id()
).withColumn(
'id2',
(F.col('id') / x).cast('int')
).withColumn(
'id3',
F.col('id') % x
).groupBy('id2').pivot('id3').agg(F.first('ColName')).orderBy('id2').drop('id2')
result.show()
+---+---+---+
| 0| 1| 2|
+---+---+---+
| a| b| c|
| d| e| f|
| g| h| i|
| j| k| l|
+---+---+---+
您可以结合使用 collect_list
和 row_number
来实现此目的。
步骤 1: 生成自定义 row_number。
from pyspark.sql.functions import floor, row_number, collect_list
from pyspark.sql.window import Window
no_of_columns = 3
df2 = df.withColumn("row_num", floor((row_number().over(Window.orderBy("ColName"))-1)/no_of_columns))
第 2 步: 使用此 row_number 对数据进行分组,然后使用 collect_list
创建列表。
df3 = df2.groupBy("row_num").agg(collect_list("ColName").alias("col_list"))
第 3 步: 使用 python 的列表理解来 select 此列表中的所有元素。
df3.select(*[df3.col_list[i].alias(f"col{i+1}") for i in range(no_of_columns)]).show()
输出:
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| b| c|
| d| e| f|
| g| h| i|
| j| k| l|
+----+----+----+
注意:参数no_of_columns
可以根据需要的输出列数进行更改。