重命名 PySpark DataFrame 聚合的列
Renaming columns for PySpark DataFrame aggregates
我正在使用 PySpark DataFrames 分析一些数据。假设我有一个正在聚合的 DataFrame df
:
(df.groupBy("group")
.agg({"money":"sum"})
.show(100)
)
这会给我:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
聚合工作正常,但我不喜欢新的列名称 SUM(money#2L)
。有没有办法通过 .agg
方法将此列重命名为人类可读的名称?也许更类似于 dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
withColumnRenamed
应该可以解决问题。这是 link 到 pyspark.sql API.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
尽管我仍然更喜欢 dplyr 语法,但此代码片段可以:
import pyspark.sql.functions as sf
(df.groupBy("group")
.agg(sf.sum('money').alias('money'))
.show(100))
它变得冗长。
我为此做了一个小辅助函数,也许能帮到一些人。
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
一个例子:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
至少做了一些工作来避免人们打这么多字。
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
以上代码可以去除“()”之外的任何内容。例如,"sum(foo)" 将重命名为 "foo"。
很简单:
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
在 agg 中使用 .as
命名创建的新行。
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
虽然之前给出的答案很好,但我认为他们缺乏一种巧妙的方法来处理 .agg()
中的 dictionary-usage
如果你想使用一个字典,它实际上也可能是动态生成的,因为你有数百列,你可以使用以下内容而不用处理几十个 code-lines:
# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
.agg({
"money":"sum"
, "...": "..."
})
# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames) # Do the renaming
当然也可以动态生成newColumnNames
列表。例如,如果您仅将聚合中的列附加到 df
,则可以 pre-store newColumnNames = df.columns
然后仅附加其他名称。
无论如何,请注意 newColumnNames
必须包含数据框的所有列名,而不仅仅是要重命名的列名(因为 .toDF()
由于 Sparks 不可变 RDD 创建了一个新数据框)!
另一个快速的小班轮添加混合:
df.groupBy('group')
.agg({'money':'sum',
'moreMoney':'sum',
'evenMoreMoney':'sum'
})
.select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))
只需将别名函数更改为您想要的名称即可。上面生成 sum_money、sum_moreMoney,因为我喜欢在变量名中看到运算符。
如果您愿意 hard-code 您的列名,.alias
和 .withColumnRenamed
都可以。如果您需要一个程序化的解决方案,例如所有剩余列的聚合的更友好的名称,这提供了一个很好的起点:
grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
df
.groupBy(grouping_column)
.agg(
*cols
)
)
我正在使用 PySpark DataFrames 分析一些数据。假设我有一个正在聚合的 DataFrame df
:
(df.groupBy("group")
.agg({"money":"sum"})
.show(100)
)
这会给我:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
聚合工作正常,但我不喜欢新的列名称 SUM(money#2L)
。有没有办法通过 .agg
方法将此列重命名为人类可读的名称?也许更类似于 dplyr
:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
withColumnRenamed
应该可以解决问题。这是 link 到 pyspark.sql API.
df.groupBy("group")\
.agg({"money":"sum"})\
.withColumnRenamed("SUM(money)", "money")
.show(100)
尽管我仍然更喜欢 dplyr 语法,但此代码片段可以:
import pyspark.sql.functions as sf
(df.groupBy("group")
.agg(sf.sum('money').alias('money'))
.show(100))
它变得冗长。
我为此做了一个小辅助函数,也许能帮到一些人。
import re
from functools import partial
def rename_cols(agg_df, ignore_first_n=1):
"""changes the default spark aggregate names `avg(colname)`
to something a bit more useful. Pass an aggregated dataframe
and the number of aggregation columns to ignore.
"""
delimiters = "(", ")"
split_pattern = '|'.join(map(re.escape, delimiters))
splitter = partial(re.split, split_pattern)
split_agg = lambda x: '_'.join(splitter(x))[0:-ignore_first_n]
renamed = map(split_agg, agg_df.columns[ignore_first_n:])
renamed = zip(agg_df.columns[ignore_first_n:], renamed)
for old, new in renamed:
agg_df = agg_df.withColumnRenamed(old, new)
return agg_df
一个例子:
gb = (df.selectExpr("id", "rank", "rate", "price", "clicks")
.groupby("id")
.agg({"rank": "mean",
"*": "count",
"rate": "mean",
"price": "mean",
"clicks": "mean",
})
)
>>> gb.columns
['id',
'avg(rate)',
'count(1)',
'avg(price)',
'avg(rank)',
'avg(clicks)']
>>> rename_cols(gb).columns
['id',
'avg_rate',
'count_1',
'avg_price',
'avg_rank',
'avg_clicks']
至少做了一些工作来避免人们打这么多字。
df = df.groupby('Device_ID').agg(aggregate_methods)
for column in df.columns:
start_index = column.find('(')
end_index = column.find(')')
if (start_index and end_index):
df = df.withColumnRenamed(column, column[start_index+1:end_index])
以上代码可以去除“()”之外的任何内容。例如,"sum(foo)" 将重命名为 "foo"。
很简单:
val maxVideoLenPerItemDf = requiredItemsFiltered.groupBy("itemId").agg(max("playBackDuration").as("customVideoLength"))
maxVideoLenPerItemDf.show()
在 agg 中使用 .as
命名创建的新行。
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('test').getOrCreate()
data = [(1, "siva", 100), (2, "siva2", 200),(3, "siva3", 300),(4, "siva4", 400),(5, "siva5", 500)]
schema = ['id', 'name', 'sallary']
df = spark.createDataFrame(data, schema=schema)
df.show()
+---+-----+-------+
| id| name|sallary|
+---+-----+-------+
| 1| siva| 100|
| 2|siva2| 200|
| 3|siva3| 300|
| 4|siva4| 400|
| 5|siva5| 500|
+---+-----+-------+
**df.agg({"sallary": "max"}).withColumnRenamed('max(sallary)', 'max').show()**
+---+
|max|
+---+
|500|
+---+
虽然之前给出的答案很好,但我认为他们缺乏一种巧妙的方法来处理 .agg()
如果你想使用一个字典,它实际上也可能是动态生成的,因为你有数百列,你可以使用以下内容而不用处理几十个 code-lines:
# Your dictionary-version of using the .agg()-function
# Note: The provided logic could actually also be applied to a non-dictionary approach
df = df.groupBy("group")\
.agg({
"money":"sum"
, "...": "..."
})
# Now do the renaming
newColumnNames = ["group", "money", "..."] # Provide the names for ALL columns of the new df
df = df.toDF(*newColumnNames) # Do the renaming
当然也可以动态生成newColumnNames
列表。例如,如果您仅将聚合中的列附加到 df
,则可以 pre-store newColumnNames = df.columns
然后仅附加其他名称。
无论如何,请注意 newColumnNames
必须包含数据框的所有列名,而不仅仅是要重命名的列名(因为 .toDF()
由于 Sparks 不可变 RDD 创建了一个新数据框)!
另一个快速的小班轮添加混合:
df.groupBy('group')
.agg({'money':'sum',
'moreMoney':'sum',
'evenMoreMoney':'sum'
})
.select(*(col(i).alias(i.replace("(",'_').replace(')','')) for i in df.columns))
只需将别名函数更改为您想要的名称即可。上面生成 sum_money、sum_moreMoney,因为我喜欢在变量名中看到运算符。
.alias
和 .withColumnRenamed
都可以。如果您需要一个程序化的解决方案,例如所有剩余列的聚合的更友好的名称,这提供了一个很好的起点:
grouping_column = 'group'
cols = [F.sum(F.col(x)).alias(x) for x in df.columns if x != grouping_column]
(
df
.groupBy(grouping_column)
.agg(
*cols
)
)