Pyspark 如何对数据框中基于行的值进行分组
Pyspark how to group row based value from a data frame
我需要根据数据框下方的每个索引对基于行的值进行分组
+-----+------+------+------+------+-----+----+-------+
|index|amount| dept | date | amount |dept |date |
+-----+-----------+-----+--+---------+---------+----+
| 1|1000 | acnt |2-4-21| 2000 | acnt2 |2-4-21 |
| 2|1500 | sales|2-3-21| 1600 | sales2|2-3-21 |
由于每一行的索引都是唯一的,而且日期是相同的,所以我需要按以下方式对行值进行分组
+-----+------ +------------+-------+
|index|amount | dept | date |
+-----+---------+------------+-------+
| 1|1000,2000|acnt,acnt2 |2-4-21 |
| 2|1500,1600|sales,sales2|2-3-21 |
我看到很多选项可以对列进行分组,但特别适用于 pyspark 中基于行的值
有什么解决方案可以像上面那样填充结果吗?
有两种方法..取决于你想要什么
from pyspark.sql.functions import struct, array, col
df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array
如果有两列同名(如您的示例),只需重新创建您的 df
(如果是join就不用了...直接用alias即可)
cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)
理想情况下,这需要在上游修复(检查您是否在上游代码中加入并尝试select only appropriate aliases to retain the unique columns only)。
话虽如此,您可以在创建辅助字典和列名后创建辅助 spark 函数:
from pyspark.sql import functions as F
from itertools import groupby
创建一个带有计数器的新列表:
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0)+1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']
然后使用这个新列表使用现有数据框创建一个数据框,并使用辅助函数根据重复检查进行连接:
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
+---------+------+------------+-----+
| amount| date| dept|index|
+---------+------+------------+-----+
|1000,2000|2-4-21| acnt,acnt2| 1|
|1500,1600|2-3-21|sales,sales2| 2|
+---------+------+------------+-----+
完整代码:
from pyspark.sql import functions as F
from itertools import groupby
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0)+1
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
假设您有一个初始数据框,如下所示
INPUT:+------+------+------+------+
| dept| dept|amount|amount|
+------+------+------+------+
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
+------+------+------+------+
- Rename the columns:
newColumns = ["dept1","dept2","amount1","amount2"]
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()
+------+------+-------+-------+
| dept1| dept2|amount1|amount2|
+------+------+-------+-------+
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
+------+------+-------+-------+
- Derive the final output columns:
final_df = None
final_df = new_clms_df.\
withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()
+------+------+-------+-------+-------------+------+
| dept1| dept2|amount1|amount2| dept|amount|
+------+------+-------+-------+-------------+------+
|sales1|sales2| 1| 1|sales1,sales2| 1,1|
|sales1|sales2| 2| 2|sales1,sales2| 2,2|
|sales1|sales2| 3| 3|sales1,sales2| 3,3|
|sales1|sales2| 4| 4|sales1,sales2| 4,4|
|sales1|sales2| 5| 5|sales1,sales2| 5,5|
+------+------+-------+-------+-------------+------+
我需要根据数据框下方的每个索引对基于行的值进行分组
+-----+------+------+------+------+-----+----+-------+
|index|amount| dept | date | amount |dept |date |
+-----+-----------+-----+--+---------+---------+----+
| 1|1000 | acnt |2-4-21| 2000 | acnt2 |2-4-21 |
| 2|1500 | sales|2-3-21| 1600 | sales2|2-3-21 |
由于每一行的索引都是唯一的,而且日期是相同的,所以我需要按以下方式对行值进行分组
+-----+------ +------------+-------+
|index|amount | dept | date |
+-----+---------+------------+-------+
| 1|1000,2000|acnt,acnt2 |2-4-21 |
| 2|1500,1600|sales,sales2|2-3-21 |
我看到很多选项可以对列进行分组,但特别适用于 pyspark 中基于行的值 有什么解决方案可以像上面那样填充结果吗?
有两种方法..取决于你想要什么
from pyspark.sql.functions import struct, array, col
df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array
如果有两列同名(如您的示例),只需重新创建您的 df
(如果是join就不用了...直接用alias即可)
cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)
理想情况下,这需要在上游修复(检查您是否在上游代码中加入并尝试select only appropriate aliases to retain the unique columns only)。
话虽如此,您可以在创建辅助字典和列名后创建辅助 spark 函数:
from pyspark.sql import functions as F
from itertools import groupby
创建一个带有计数器的新列表:
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0)+1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']
然后使用这个新列表使用现有数据框创建一个数据框,并使用辅助函数根据重复检查进行连接:
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
+---------+------+------------+-----+
| amount| date| dept|index|
+---------+------+------------+-----+
|1000,2000|2-4-21| acnt,acnt2| 1|
|1500,1600|2-3-21|sales,sales2| 2|
+---------+------+------------+-----+
完整代码:
from pyspark.sql import functions as F
from itertools import groupby
l = []
s = {}
for i in df.columns:
l.append(f"{i}_{s.get(i)}" if i in s else i)
s[i] = s.get(i,0)+1
def mysparkfunc(cols):
cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
return [F.concat_ws(",",*col).alias(col[0])
if len(col)>1 and col[0]!= 'date'
else F.col(col[0]) for col in cols]
df.toDF(*l).select(*mysparkfunc(l)).show()
假设您有一个初始数据框,如下所示
INPUT:+------+------+------+------+
| dept| dept|amount|amount|
+------+------+------+------+
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
+------+------+------+------+
- Rename the columns:
newColumns = ["dept1","dept2","amount1","amount2"]
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()
+------+------+-------+-------+
| dept1| dept2|amount1|amount2|
+------+------+-------+-------+
|sales1|sales2| 1| 1|
|sales1|sales2| 2| 2|
|sales1|sales2| 3| 3|
|sales1|sales2| 4| 4|
|sales1|sales2| 5| 5|
+------+------+-------+-------+
- Derive the final output columns:
final_df = None
final_df = new_clms_df.\
withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()
+------+------+-------+-------+-------------+------+
| dept1| dept2|amount1|amount2| dept|amount|
+------+------+-------+-------+-------------+------+
|sales1|sales2| 1| 1|sales1,sales2| 1,1|
|sales1|sales2| 2| 2|sales1,sales2| 2,2|
|sales1|sales2| 3| 3|sales1,sales2| 3,3|
|sales1|sales2| 4| 4|sales1,sales2| 4,4|
|sales1|sales2| 5| 5|sales1,sales2| 5,5|
+------+------+-------+-------+-------------+------+