在 Palantir Foundry 代码存储库中定义 Pandas UDF 的正确方法是什么
What is the proper way to define a Pandas UDF in a Palantir Foundry Code Repository
我想在 Palantir Foundry 代码存储库中定义以下 pandas_udf。
@pandas_udf("long", PandasUDFType.GROUPED_AGG)
def percentile_95_udf(v):
return v.quantile(0.95)
但是当我尝试在全局范围内定义此 udf 时,出现错误:
AttributeError: 'NoneType' object has no attribute '_jvm'
但是,如果我在转换调用的函数中定义相同的函数,代码运行正常,如:
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from transforms.api import transform, Input, Output
@transform(
data_out=Output("output path"),
data_in=Input("input path")
)
def percentile_95_transform(data_in, data_out):
data_out.write_dataframe(percentile_95(data_in.dataframe()))
def percentile_95(df):
@pandas_udf("long", PandasUDFType.GROUPED_AGG)
def percentile_95_udf(v):
return v.quantile(0.95)
# group rows for each interface into 1 day periods
grp_by = df.groupBy(df.objectId, F.window("TimeCaptured", "1 day"))
stats = [
percentile_95_udf(df.ReceivedWidgets),
percentile_95_udf(df.TransmittedWidgets),
]
result = grp_by.agg(*stats)
cleaned = result.withColumn("Day", F.col("window").start).drop("window")
return cleaned
为什么我的 pandas_udf 在全局范围内不起作用,但在另一个函数中定义时却起作用?另外,是否有更好的方法来定义 pandas_udf?将其定义为嵌套函数使我无法重用我的 udf。
作为参考,我在 Palantir Foundry 中的代码存储库具有以下结构:
transforms-python
conda_recipe
meta.yaml
src
myproject
datasets
__init__.py
percentile_95.py
__init__.py
pipeline.py
setup.cfg
setup.py
原因和这个问题的根源类似:PySpark error: AttributeError: 'NoneType' object has no attribute '_jvm'
当您在全局级别进行调用时,您正在尝试在设置 spark 之前执行 spark 命令(在您的情况下通过 pandas)。当您在转换中进行调用时,spark 可用,因此它可以工作。
这里的主要问题是在顶层调用注释本身,而 spark 仅在转换运行时设置。当您从 def percentile_95(df):
中调用它时,您实际上是在此处的转换中调用注释:
@transform(
data_out=Output("output path"),
data_in=Input("input path")
)
def percentile_95_transform(data_in, data_out):
data_out.write_dataframe(
percentile_95( # <-- here we're inside a transform
data_in.dataframe()))
如果你想在多个地方重用这些 UDF,也许你可以将它们包装在一个函数或一个 class 中,你在你想使用的每个转换中初始化。
我想在 Palantir Foundry 代码存储库中定义以下 pandas_udf。
@pandas_udf("long", PandasUDFType.GROUPED_AGG)
def percentile_95_udf(v):
return v.quantile(0.95)
但是当我尝试在全局范围内定义此 udf 时,出现错误:
AttributeError: 'NoneType' object has no attribute '_jvm'
但是,如果我在转换调用的函数中定义相同的函数,代码运行正常,如:
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from transforms.api import transform, Input, Output
@transform(
data_out=Output("output path"),
data_in=Input("input path")
)
def percentile_95_transform(data_in, data_out):
data_out.write_dataframe(percentile_95(data_in.dataframe()))
def percentile_95(df):
@pandas_udf("long", PandasUDFType.GROUPED_AGG)
def percentile_95_udf(v):
return v.quantile(0.95)
# group rows for each interface into 1 day periods
grp_by = df.groupBy(df.objectId, F.window("TimeCaptured", "1 day"))
stats = [
percentile_95_udf(df.ReceivedWidgets),
percentile_95_udf(df.TransmittedWidgets),
]
result = grp_by.agg(*stats)
cleaned = result.withColumn("Day", F.col("window").start).drop("window")
return cleaned
为什么我的 pandas_udf 在全局范围内不起作用,但在另一个函数中定义时却起作用?另外,是否有更好的方法来定义 pandas_udf?将其定义为嵌套函数使我无法重用我的 udf。
作为参考,我在 Palantir Foundry 中的代码存储库具有以下结构:
transforms-python
conda_recipe
meta.yaml
src
myproject
datasets
__init__.py
percentile_95.py
__init__.py
pipeline.py
setup.cfg
setup.py
原因和这个问题的根源类似:PySpark error: AttributeError: 'NoneType' object has no attribute '_jvm'
当您在全局级别进行调用时,您正在尝试在设置 spark 之前执行 spark 命令(在您的情况下通过 pandas)。当您在转换中进行调用时,spark 可用,因此它可以工作。
这里的主要问题是在顶层调用注释本身,而 spark 仅在转换运行时设置。当您从 def percentile_95(df):
中调用它时,您实际上是在此处的转换中调用注释:
@transform(
data_out=Output("output path"),
data_in=Input("input path")
)
def percentile_95_transform(data_in, data_out):
data_out.write_dataframe(
percentile_95( # <-- here we're inside a transform
data_in.dataframe()))
如果你想在多个地方重用这些 UDF,也许你可以将它们包装在一个函数或一个 class 中,你在你想使用的每个转换中初始化。