Class 方法作为 Pyspark udf
Class methods as Pyspark udf
我有以下代码
import numpy as np
import pandas as pd
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(a) + b
我还有一个包含双列 a
和 b
的 Pyspark 数据框。我要运行
df.withColumn('c', MyClass(df['a']).f(df['b']))
这当然失败了。我如何正确调整 MyClass
的代码以使其工作。 (请注意,我不能简单地根据 Pyspark 函数编写函数 f
。
您可以添加一个 UDF 来包装 class:
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(self.a) + b
@F.pandas_udf('float')
def myClassUDF(a: pd.Series, b: pd.Series) -> pd.Series:
return MyClass(a).f(b)
df = spark.createDataFrame([[0,1], [0,2]],['a','b'])
df.withColumn('c', myClassUDF('a','b')).show()
+---+---+---+
| a| b| c|
+---+---+---+
| 0| 1|2.0|
| 0| 2|3.0|
+---+---+---+
我有以下代码
import numpy as np
import pandas as pd
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(a) + b
我还有一个包含双列 a
和 b
的 Pyspark 数据框。我要运行
df.withColumn('c', MyClass(df['a']).f(df['b']))
这当然失败了。我如何正确调整 MyClass
的代码以使其工作。 (请注意,我不能简单地根据 Pyspark 函数编写函数 f
。
您可以添加一个 UDF 来包装 class:
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(self.a) + b
@F.pandas_udf('float')
def myClassUDF(a: pd.Series, b: pd.Series) -> pd.Series:
return MyClass(a).f(b)
df = spark.createDataFrame([[0,1], [0,2]],['a','b'])
df.withColumn('c', myClassUDF('a','b')).show()
+---+---+---+
| a| b| c|
+---+---+---+
| 0| 1|2.0|
| 0| 2|3.0|
+---+---+---+