Class 方法作为 Pyspark udf

Class methods as Pyspark udf

我有以下代码

import numpy as np
import pandas as pd

class MyClass:
    def __init__(self, a: pd.Series):
        self.a = a

    def f(self, b: pd.Series):
        return np.exp(a) + b

我还有一个包含双列 ab 的 Pyspark 数据框。我要运行

df.withColumn('c', MyClass(df['a']).f(df['b']))

这当然失败了。我如何正确调整 MyClass 的代码以使其工作。 (请注意,我不能简单地根据 Pyspark 函数编写函数 f

您可以添加一个 UDF 来包装 class:

import pyspark.sql.functions as F
import pandas as pd
import numpy as np

class MyClass:
    def __init__(self, a: pd.Series):
        self.a = a
    def f(self, b: pd.Series):
        return np.exp(self.a) + b

@F.pandas_udf('float')
def myClassUDF(a: pd.Series, b: pd.Series) -> pd.Series:
    return MyClass(a).f(b)

df = spark.createDataFrame([[0,1], [0,2]],['a','b'])

df.withColumn('c', myClassUDF('a','b')).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  0|  1|2.0|
|  0|  2|3.0|
+---+---+---+