PySpark applyinpands/grouped_map pandas_udf 参数太多
PySpark applyinpands/grouped_map pandas_udf too many arguments
我正在尝试在我的 python 代码中使用 pyspark applyInPandas
。问题是,我要传递给它的函数存在于同一个 class 中,因此它被定义为 def func(self, key, df)
。这成为一个问题,因为 applyInPandas
会出错,说我向底层函数传递了太多参数(最多它允许 key
和 df
参数,所以 self
导致了这个问题)。有什么解决办法吗?
基本目标是在数据帧组上并行处理 pandas 函数。
如 OP 所述,一种方法是仅使用 @staticmethod
,在某些情况下可能并不理想。
用于创建 pandas_udf 的 pyspark source code 使用 inspect.getfullargspec().args
(第 386、436 行),这包括 self
,即使 class 方法是从实例。我认为这是他们的一个错误(也许值得提出一个问题)。
为了克服这个问题,最简单的方法是使用 functools.partial
这可以帮助更改 argspec,即删除 self
参数并将 args 的数量恢复为 2。
这是基于调用实例方法与直接从 class 调用方法相同并将实例作为第一个参数提供(由于描述符魔法)的想法:
A.func(A(), *args, **kwargs) == A().func(*args, **kwargs)
举个具体的例子,
import functools
import inspect
class A:
def __init__(self, y):
self.y = y
def sum(self, a: int, b: int):
return (a + b) * self.y
def x(self):
# calling the method using the class and then supply the self argument
f = functools.partial(A.sum, self)
print(f(1, 2))
print(inspect.getfullargspec(f).args)
A(2).x()
这将打印
6 # can still use 'self.y'
['a', 'b'] # 2 arguments (without 'self')
然后,在 OP 的情况下,可以简单地对 key, df
参数执行相同的操作:
class A:
def __init__(self):
...
def func(self, key, df):
...
def x(self):
f = functools.partial(A.func, self)
self.df.groupby(...).applyInPandas(f)
我正在尝试在我的 python 代码中使用 pyspark applyInPandas
。问题是,我要传递给它的函数存在于同一个 class 中,因此它被定义为 def func(self, key, df)
。这成为一个问题,因为 applyInPandas
会出错,说我向底层函数传递了太多参数(最多它允许 key
和 df
参数,所以 self
导致了这个问题)。有什么解决办法吗?
基本目标是在数据帧组上并行处理 pandas 函数。
如 OP 所述,一种方法是仅使用 @staticmethod
,在某些情况下可能并不理想。
用于创建 pandas_udf 的 pyspark source code 使用 inspect.getfullargspec().args
(第 386、436 行),这包括 self
,即使 class 方法是从实例。我认为这是他们的一个错误(也许值得提出一个问题)。
为了克服这个问题,最简单的方法是使用 functools.partial
这可以帮助更改 argspec,即删除 self
参数并将 args 的数量恢复为 2。
这是基于调用实例方法与直接从 class 调用方法相同并将实例作为第一个参数提供(由于描述符魔法)的想法:
A.func(A(), *args, **kwargs) == A().func(*args, **kwargs)
举个具体的例子,
import functools
import inspect
class A:
def __init__(self, y):
self.y = y
def sum(self, a: int, b: int):
return (a + b) * self.y
def x(self):
# calling the method using the class and then supply the self argument
f = functools.partial(A.sum, self)
print(f(1, 2))
print(inspect.getfullargspec(f).args)
A(2).x()
这将打印
6 # can still use 'self.y'
['a', 'b'] # 2 arguments (without 'self')
然后,在 OP 的情况下,可以简单地对 key, df
参数执行相同的操作:
class A:
def __init__(self):
...
def func(self, key, df):
...
def x(self):
f = functools.partial(A.func, self)
self.df.groupby(...).applyInPandas(f)