pyspark dataframe根据列后缀转置多列
pyspark dataframe transpose multiple columns based on column suffix
有一个数据框(c 到 e 列最多有 15 个变体)
cola, colb, colc_1, cold_1, cole_1, colc_2, cold_2, cole_2...
1, 2, 3, 4, 5, 6, 7, 8
想要数据框
cola, colb, new_col colc, cold, cole,
1, 2, _1, 3, 4, 5
1, 2, _2, 6, 7, 8
希望将 colc 转置为 cole 并使用这些列的后缀 (_1, _2..._15) 作为转置字段的值 (new_col)
我可以在 Pandas 中使用 melt 和 pivot 执行此操作,但此示例中的数据帧太大而无法转换为 Pandas df,需要在 pyspark 或 aws 中完成胶水
您可以尝试 select()
和 union()
。下面的代码首先列出了基本逻辑,然后使用 reduce()
函数来消除所有中间数据帧:
from pyspark.sql import functions as F
from functools import reduce
df = spark.createDataFrame([
(1,2,3,4,5,6,7,8)
, (11,12,13,14,15,16,17,18)
, (21,22,23,24,25,26,27,28)
],
[ 'cola', 'colb'
, 'colc_1', 'cold_1', 'cole_1'
, 'colc_2', 'cold_2', 'cole_2'
])
# create df1 with all columns for new_col = '_1'
df1 = df.select('cola', 'colb', F.lit('_1'), 'colc_1', 'cold_1', 'cole_1')
df1.show()
#+----+----+---+------+------+------+
#|cola|colb| _1|colc_1|cold_1|cole_1|
#+----+----+---+------+------+------+
#| 1| 2| _1| 3| 4| 5|
#| 11| 12| _1| 13| 14| 15|
#| 21| 22| _1| 23| 24| 25|
#+----+----+---+------+------+------+
# do the similar for '_2'
df2 = df.select('cola', 'colb', F.lit('_2'), *["col{}_2".format(i) for i in list("cde")])
#+----+----+---+------+------+------+
#|cola|colb| _2|colc_2|cold_2|cole_2|
#+----+----+---+------+------+------+
#| 1| 2| _2| 6| 7| 8|
#| 11| 12| _2| 16| 17| 18|
#| 21| 22| _2| 26| 27| 28|
#+----+----+---+------+------+------+
# then union these two dataframe and adjust the final column names
df_new = df1.union(df2).toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole')
df_new.show()
#+----+----+-------+----+----+----+
#|cola|colb|new_col|colc|cold|cole|
#+----+----+-------+----+----+----+
#| 1| 2| _1| 3| 4| 5|
#| 11| 12| _1| 13| 14| 15|
#| 21| 22| _1| 23| 24| 25|
#| 1| 2| _2| 6| 7| 8|
#| 11| 12| _2| 16| 17| 18|
#| 21| 22| _2| 26| 27| 28|
#+----+----+-------+----+----+----+
接下来我们可以使用reduce()
函数来处理没有上面临时df1,df2等的所有列组:
# setup the list of columns to be normalized
normalize_cols = ["col{}".format(c) for c in list("cde")]
# ["colc", "cold", "cole"]
# change N to 16 to cover new_col from '_1' to '_15'
N = 3
# use reduce to handle all groups
df_new = reduce(
lambda d1,d2: d1.union(d2)
, [ df.select('cola', 'colb', F.lit('_{}'.format(i)), *["{}_{}".format(c,i) for c in normalize_cols]) for i in range(1,N) ]
).toDF('cola', 'colb', 'new_col', *normalize_cols)
另一种方法是使用 F.array()
和 F.explode()
(对所有 _N
使用 reduce()):
df.withColumn('d1', F.array(F.lit('_1'), *['col{}_1'.format(c) for c in list("cde")])) \
.withColumn('d2', F.array(F.lit('_2'), *['col{}_2'.format(c) for c in list("cde")])) \
.withColumn('h', F.array('d1', 'd2')) \
.withColumn('h1', F.explode('h')) \
.select('cola', 'colb', *[ F.col('h1')[i] for i in range(4)]) \
.toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole') \
.show()
根据评论更新:
为了对数据帧进行非规范化,我使用 F.array()
然后 F.collect_list
将列分组到数组列表中,然后引用 groupby()
结果中的值:
使用Window函数设置collect_list中元素的顺序:
N = 3
normalize_cols = ["col{}".format(c) for c in list("cde")]
# win spec so that element in collect_list are sorted based on 'new_col'
win = Window.partitionBy('cola', 'colb').orderBy('new_col')
df_new.withColumn('cols', F.array(normalize_cols)) \
.withColumn('clist', F.collect_list('cols').over(win)) \
.groupby('cola', 'colb').agg(F.last('clist').alias('clist1')) \
.select('cola', 'colb', *[ F.col('clist1')[i].alias('c{}'.format(i)) for i in range(N-1)]) \
.select('cola', 'colb', *[ F.col('c{}'.format(i))[j].alias('{}_{}'.format(normalize_cols[j],i+1)) for i in range(N-1) for j in range(len(normalize_cols)) ]) \
.show()
# +----+----+------+------+------+------+------+------+
# |cola|colb|colc_1|cold_1|cole_1|colc_2|cold_2|cole_2|
# +----+----+------+------+------+------+------+------+
# | 11| 12| 13| 14| 15| 16| 17| 18|
# | 21| 22| 23| 24| 25| 26| 27| 28|
# | 1| 2| 3| 4| 5| 6| 7| 8|
# +----+----+------+------+------+------+------+------+
一些解释:
F.last()
in groupby.agg() returns 来自相同 partitionBy(groupby) 下的 Window 函数的完整 collect_list
- 第一个
select()
将collect_list()转换成c0, c1
- 第二个
select()
将c0转换为colc_1、cold_1、cole_1和c1 至 colc_2、cold_2、cole_2
有一个数据框(c 到 e 列最多有 15 个变体)
cola, colb, colc_1, cold_1, cole_1, colc_2, cold_2, cole_2...
1, 2, 3, 4, 5, 6, 7, 8
想要数据框
cola, colb, new_col colc, cold, cole,
1, 2, _1, 3, 4, 5
1, 2, _2, 6, 7, 8
希望将 colc 转置为 cole 并使用这些列的后缀 (_1, _2..._15) 作为转置字段的值 (new_col)
我可以在 Pandas 中使用 melt 和 pivot 执行此操作,但此示例中的数据帧太大而无法转换为 Pandas df,需要在 pyspark 或 aws 中完成胶水
您可以尝试 select()
和 union()
。下面的代码首先列出了基本逻辑,然后使用 reduce()
函数来消除所有中间数据帧:
from pyspark.sql import functions as F
from functools import reduce
df = spark.createDataFrame([
(1,2,3,4,5,6,7,8)
, (11,12,13,14,15,16,17,18)
, (21,22,23,24,25,26,27,28)
],
[ 'cola', 'colb'
, 'colc_1', 'cold_1', 'cole_1'
, 'colc_2', 'cold_2', 'cole_2'
])
# create df1 with all columns for new_col = '_1'
df1 = df.select('cola', 'colb', F.lit('_1'), 'colc_1', 'cold_1', 'cole_1')
df1.show()
#+----+----+---+------+------+------+
#|cola|colb| _1|colc_1|cold_1|cole_1|
#+----+----+---+------+------+------+
#| 1| 2| _1| 3| 4| 5|
#| 11| 12| _1| 13| 14| 15|
#| 21| 22| _1| 23| 24| 25|
#+----+----+---+------+------+------+
# do the similar for '_2'
df2 = df.select('cola', 'colb', F.lit('_2'), *["col{}_2".format(i) for i in list("cde")])
#+----+----+---+------+------+------+
#|cola|colb| _2|colc_2|cold_2|cole_2|
#+----+----+---+------+------+------+
#| 1| 2| _2| 6| 7| 8|
#| 11| 12| _2| 16| 17| 18|
#| 21| 22| _2| 26| 27| 28|
#+----+----+---+------+------+------+
# then union these two dataframe and adjust the final column names
df_new = df1.union(df2).toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole')
df_new.show()
#+----+----+-------+----+----+----+
#|cola|colb|new_col|colc|cold|cole|
#+----+----+-------+----+----+----+
#| 1| 2| _1| 3| 4| 5|
#| 11| 12| _1| 13| 14| 15|
#| 21| 22| _1| 23| 24| 25|
#| 1| 2| _2| 6| 7| 8|
#| 11| 12| _2| 16| 17| 18|
#| 21| 22| _2| 26| 27| 28|
#+----+----+-------+----+----+----+
接下来我们可以使用reduce()
函数来处理没有上面临时df1,df2等的所有列组:
# setup the list of columns to be normalized
normalize_cols = ["col{}".format(c) for c in list("cde")]
# ["colc", "cold", "cole"]
# change N to 16 to cover new_col from '_1' to '_15'
N = 3
# use reduce to handle all groups
df_new = reduce(
lambda d1,d2: d1.union(d2)
, [ df.select('cola', 'colb', F.lit('_{}'.format(i)), *["{}_{}".format(c,i) for c in normalize_cols]) for i in range(1,N) ]
).toDF('cola', 'colb', 'new_col', *normalize_cols)
另一种方法是使用 F.array()
和 F.explode()
(对所有 _N
使用 reduce()):
df.withColumn('d1', F.array(F.lit('_1'), *['col{}_1'.format(c) for c in list("cde")])) \
.withColumn('d2', F.array(F.lit('_2'), *['col{}_2'.format(c) for c in list("cde")])) \
.withColumn('h', F.array('d1', 'd2')) \
.withColumn('h1', F.explode('h')) \
.select('cola', 'colb', *[ F.col('h1')[i] for i in range(4)]) \
.toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole') \
.show()
根据评论更新:
为了对数据帧进行非规范化,我使用 F.array()
然后 F.collect_list
将列分组到数组列表中,然后引用 groupby()
结果中的值:
使用Window函数设置collect_list中元素的顺序:
N = 3
normalize_cols = ["col{}".format(c) for c in list("cde")]
# win spec so that element in collect_list are sorted based on 'new_col'
win = Window.partitionBy('cola', 'colb').orderBy('new_col')
df_new.withColumn('cols', F.array(normalize_cols)) \
.withColumn('clist', F.collect_list('cols').over(win)) \
.groupby('cola', 'colb').agg(F.last('clist').alias('clist1')) \
.select('cola', 'colb', *[ F.col('clist1')[i].alias('c{}'.format(i)) for i in range(N-1)]) \
.select('cola', 'colb', *[ F.col('c{}'.format(i))[j].alias('{}_{}'.format(normalize_cols[j],i+1)) for i in range(N-1) for j in range(len(normalize_cols)) ]) \
.show()
# +----+----+------+------+------+------+------+------+
# |cola|colb|colc_1|cold_1|cole_1|colc_2|cold_2|cole_2|
# +----+----+------+------+------+------+------+------+
# | 11| 12| 13| 14| 15| 16| 17| 18|
# | 21| 22| 23| 24| 25| 26| 27| 28|
# | 1| 2| 3| 4| 5| 6| 7| 8|
# +----+----+------+------+------+------+------+------+
一些解释:
F.last()
in groupby.agg() returns 来自相同 partitionBy(groupby) 下的 Window 函数的完整 collect_list
- 第一个
select()
将collect_list()转换成c0, c1 - 第二个
select()
将c0转换为colc_1、cold_1、cole_1和c1 至 colc_2、cold_2、cole_2