如何在 pyspark groupby 上使用带有 pandas 的 UDF?
How to use UDFs with pandas on pyspark groupby?
我很难在 pyspark 上的 pandas 上使用 pandas UDF。你能帮我理解这是如何实现的吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
这导致了一个我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
Aggregate [__index_level_0__#14], [__index_level_0__#14, agg_a_(B#2L) AS B#15, agg_b_(C#3L) AS C#16]
+- Project [A#1 AS __index_level_0__#14, A#1, B#2L, C#3L]
+- Project [__index_level_0__#0L, A#1, B#2L, C#3L, monotonically_increasing_id() AS __natural_order__#8L]
+- LogicalRDD [__index_level_0__#0L, A#1, B#2L, C#3L], false
我尝试使用 udf
而不是 pandas_udf
但是,同样的异常也失败了
我也尝试仅在一列上使用带有 UDF 的 groupby,但这也失败了:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@udf('float')
def agg_a(x):
return (x**2).mean()
@udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')['B']
dfout=df_means.agg('agg_a_')
输出:
PandasNotImplementedError: The method `pd.groupby.GroupBy.agg()` is not implemented yet.
我猜这不是真的。如果我不使用 UDF 并使用已经定义的函数,如 'min'、'max'.
,我可以使用 groupby
我尝试在不按列指定不同 UDF 的情况下使用,但也失败了:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@udf('float')
def agg_a(x):
return (x**2).mean()
@udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg('agg_a_')
输出:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
Aggregate [__index_level_0__#14], [__index_level_0__#14, agg_a_(B#2L) AS B#15, agg_a_(C#3L) AS C#16]
+- Project [A#1 AS __index_level_0__#14, A#1, B#2L, C#3L]
+- Project [__index_level_0__#0L, A#1, B#2L, C#3L, monotonically_increasing_id() AS __natural_order__#8L]
+- LogicalRDD [__index_level_0__#0L, A#1, B#2L, C#3L], false
根据 GroupedData.agg
documentation,您需要用 PandasUDFType
定义您的 pandas_udf
。如果您需要聚合,那么它将是 PandasUDFType.GROUPED_AGG
.
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
return (x**2).mean()
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df.groupby('A').agg({'B':'agg_a_','C':'agg_b_'}).show()
# +---+---------+---------+
# | A|agg_a_(B)|agg_b_(C)|
# +---+---------+---------+
# | b| 9.0| 5.0|
# | a| 2.5| 5.0|
# +---+---------+---------+
我很难在 pyspark 上的 pandas 上使用 pandas UDF。你能帮我理解这是如何实现的吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
这导致了一个我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
Aggregate [__index_level_0__#14], [__index_level_0__#14, agg_a_(B#2L) AS B#15, agg_b_(C#3L) AS C#16]
+- Project [A#1 AS __index_level_0__#14, A#1, B#2L, C#3L]
+- Project [__index_level_0__#0L, A#1, B#2L, C#3L, monotonically_increasing_id() AS __natural_order__#8L]
+- LogicalRDD [__index_level_0__#0L, A#1, B#2L, C#3L], false
我尝试使用 udf
而不是 pandas_udf
但是,同样的异常也失败了
我也尝试仅在一列上使用带有 UDF 的 groupby,但这也失败了:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@udf('float')
def agg_a(x):
return (x**2).mean()
@udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')['B']
dfout=df_means.agg('agg_a_')
输出:
PandasNotImplementedError: The method `pd.groupby.GroupBy.agg()` is not implemented yet.
我猜这不是真的。如果我不使用 UDF 并使用已经定义的函数,如 'min'、'max'.
,我可以使用 groupby我尝试在不按列指定不同 UDF 的情况下使用,但也失败了:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@udf('float')
def agg_a(x):
return (x**2).mean()
@udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg('agg_a_')
输出:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
Aggregate [__index_level_0__#14], [__index_level_0__#14, agg_a_(B#2L) AS B#15, agg_a_(C#3L) AS C#16]
+- Project [A#1 AS __index_level_0__#14, A#1, B#2L, C#3L]
+- Project [__index_level_0__#0L, A#1, B#2L, C#3L, monotonically_increasing_id() AS __natural_order__#8L]
+- LogicalRDD [__index_level_0__#0L, A#1, B#2L, C#3L], false
根据 GroupedData.agg
documentation,您需要用 PandasUDFType
定义您的 pandas_udf
。如果您需要聚合,那么它将是 PandasUDFType.GROUPED_AGG
.
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_a(x):
return (x**2).mean()
@pandas_udf('float', PandasUDFType.GROUPED_AGG)
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df.groupby('A').agg({'B':'agg_a_','C':'agg_b_'}).show()
# +---+---------+---------+
# | A|agg_a_(B)|agg_b_(C)|
# +---+---------+---------+
# | b| 9.0| 5.0|
# | a| 2.5| 5.0|
# +---+---------+---------+