PySpark applyinpands/grouped_map pandas_udf 参数太多

PySpark applyinpands/grouped_map pandas_udf too many arguments

我正在尝试在我的 python 代码中使用 pyspark applyInPandas。问题是,我要传递给它的函数存在于同一个 class 中,因此它被定义为 def func(self, key, df)。这成为一个问题,因为 applyInPandas 会出错,说我向底层函数传递了太多参数(最多它允许 keydf 参数,所以 self 导致了这个问题)。有什么解决办法吗?

基本目标是在数据帧组上并行处理 pandas 函数。

如 OP 所述,一种方法是仅使用 @staticmethod,在某些情况下可能并不理想。

用于创建 pandas_udf 的 pyspark source code 使用 inspect.getfullargspec().args(第 386、436 行),这包括 self,即使 class 方法是从实例。我认为这是他们的一个错误(也许值得提出一个问题)。

为了克服这个问题,最简单的方法是使用 functools.partial 这可以帮助更改 argspec,即删除 self 参数并将 args 的数量恢复为 2。

这是基于调用实例方法与直接从 class 调用方法相同并将实例作为第一个参数提供(由于描述符魔法)的想法:

A.func(A(), *args, **kwargs) == A().func(*args, **kwargs)

举个具体的例子,

import functools
import inspect


class A:
    def __init__(self, y):
        self.y = y

    def sum(self, a: int, b: int):
        return (a + b) * self.y

    def x(self):
        # calling the method using the class and then supply the self argument
        f = functools.partial(A.sum, self)  
        print(f(1, 2))
        print(inspect.getfullargspec(f).args)
        
A(2).x()

这将打印

6           # can still use 'self.y'
['a', 'b']  # 2 arguments (without 'self')

然后,在 OP 的情况下,可以简单地对 key, df 参数执行相同的操作:

class A:
    def __init__(self):
        ...

    def func(self, key, df):
        ...

    def x(self):
        f = functools.partial(A.func, self)
        self.df.groupby(...).applyInPandas(f)