Pyspark 如何对数据框中基于行的值进行分组

Pyspark how to group row based value from a data frame

我需要根据数据框下方的每个索引对基于行的值进行分组

+-----+------+------+------+------+-----+----+-------+
|index|amount| dept | date |  amount |dept   |date   |
+-----+-----------+-----+--+---------+---------+----+
|    1|1000  | acnt |2-4-21|  2000   | acnt2 |2-4-21 |
|    2|1500  | sales|2-3-21|  1600   | sales2|2-3-21 |       

由于每一行的索引都是唯一的,而且日期是相同的,所以我需要按以下方式对行值进行分组

+-----+------   +------------+-------+
|index|amount   | dept       | date  |
+-----+---------+------------+-------+
|    1|1000,2000|acnt,acnt2  |2-4-21 | 
|    2|1500,1600|sales,sales2|2-3-21 |       

我看到很多选项可以对列进行分组,但特别适用于 pyspark 中基于行的值 有什么解决方案可以像上面那样填充结果吗?

有两种方法..取决于你想要什么

from pyspark.sql.functions import struct, array, col

df = df.withColumn('amount', struct(col('amount1'),col('amount2')) # Map
df = df.withColumn('amount', array(col('amount1'),col('amount2')) # Array

如果有两列同名(如您的示例),只需重新创建您的 df
(如果是join就不用了...直接用alias即可)

cols = ['index','amount1','dept', 'amount2', 'dept2', 'date']
df = df.toDF(*cols)

理想情况下,这需要在上游修复(检查您是否在上游代码中加入并尝试select only appropriate aliases to retain the unique columns only)。

话虽如此,您可以在创建辅助字典和列名后创建辅助 spark 函数:

from pyspark.sql import functions as F
from itertools import groupby

创建一个带有计数器的新列表:

l = []
s = {}
for i in df.columns:
    l.append(f"{i}_{s.get(i)}" if i in s else i)
    s[i] = s.get(i,0)+1
#['index', 'amount', 'dept', 'date', 'amount_1', 'dept_1', 'date_1']

然后使用这个新列表使用现有数据框创建一个数据框,并使用辅助函数根据重复检查进行连接:

def mysparkfunc(cols):
    cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
    return [F.concat_ws(",",*col).alias(col[0]) 
            if len(col)>1 and col[0]!= 'date' 
            else F.col(col[0]) for col in cols]

df.toDF(*l).select(*mysparkfunc(l)).show()

+---------+------+------------+-----+
|   amount|  date|        dept|index|
+---------+------+------------+-----+
|1000,2000|2-4-21|  acnt,acnt2|    1|
|1500,1600|2-3-21|sales,sales2|    2|
+---------+------+------------+-----+

完整代码:

from pyspark.sql import functions as F
from itertools import groupby

l = []
s = {}
for i in df.columns:
    l.append(f"{i}_{s.get(i)}" if i in s else i)
    s[i] = s.get(i,0)+1
def mysparkfunc(cols):
    cols = [list(v) for k,v in groupby(sorted(cols),lambda x: x.split("_")[0])]
    return [F.concat_ws(",",*col).alias(col[0]) 
            if len(col)>1 and col[0]!= 'date' 
            else F.col(col[0]) for col in cols]

df.toDF(*l).select(*mysparkfunc(l)).show()

假设您有一个初始数据框,如下所示

INPUT:+------+------+------+------+
|  dept|  dept|amount|amount|
+------+------+------+------+
|sales1|sales2|     1|     1|
|sales1|sales2|     2|     2|
|sales1|sales2|     3|     3|
|sales1|sales2|     4|     4|
|sales1|sales2|     5|     5|
+------+------+------+------+
  1. Rename the columns:
newColumns = ["dept1","dept2","amount1","amount2"]    
new_clms_df = df.toDF(*newColumns)
new_clms_df.show()

    +------+------+-------+-------+
    | dept1| dept2|amount1|amount2|
    +------+------+-------+-------+
    |sales1|sales2|      1|      1|
    |sales1|sales2|      2|      2|
    |sales1|sales2|      3|      3|
    |sales1|sales2|      4|      4|
    |sales1|sales2|      5|      5|
    +------+------+-------+-------+
  1. Derive the final output columns:
final_df = None           
final_df = new_clms_df.\
            withColumn('dept', concat_ws(',',new_clms_df['dept1'],new_clms_df['dept2'])).\
            withColumn('amount', concat_ws(',',new_clms_df['amount1'],new_clms_df['amount2']))
final_df.show()


+------+------+-------+-------+-------------+------+
| dept1| dept2|amount1|amount2|         dept|amount|
+------+------+-------+-------+-------------+------+
|sales1|sales2|      1|      1|sales1,sales2|   1,1|
|sales1|sales2|      2|      2|sales1,sales2|   2,2|
|sales1|sales2|      3|      3|sales1,sales2|   3,3|
|sales1|sales2|      4|      4|sales1,sales2|   4,4|
|sales1|sales2|      5|      5|sales1,sales2|   5,5|

+------+------+-------+-------+-------------+------+