无法在没有 Spark 会话的情况下在 PySpark 项目中构建文档 运行
Can't build docs in PySpark project without Spark session running
我有一个 Python 包,其中包含一个模块,其中包含要在 PySpark 设置中使用的 UDF。我已经想出了一种方法来在 运行 进行单元测试时初始化和关闭 Spark 会话,但是我在创建文档时遇到了问题。我正在使用 Sphinx docs 所以我只是 运行ning make clean docs
和 运行ning 进入这个错误:
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf
return _create_udf(f, returnType, evalType)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 43, in _create_udf
return udf_obj._wrapped()
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 206, in _wrapped
wrapper.returnType = self.returnType
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 96, in returnType
self._returnType_placeholder = _parse_datatype_string(self._returnType)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 843, in _parse_datatype_string
raise e
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 833, in _parse_datatype_string
return from_ddl_schema(s)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 825, in from_ddl_schema
sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())
AttributeError: 'NoneType' object has no attribute '_jvm'
错误当然是由于试图加载包含定义如下的 UDF 的模块引起的:
import pyspark.sql.functions as F
@F.pandas_udf(returnType='string')
def some_udf(col):
return col
据我了解,问题是无法创建 pandas_udf
,因为当我 运行 make clean docs
时没有可用的 Spark 会话。此库已导入并用于我们的 Databricks 集群,因此 Spark 会话已创建并在加载库时为您存在。我制作了一些其他的 UDF,它们同时接受静态值和列参数:
import pyspark.sql.functions as F
def outer_function(column_arg, integer_arg):
@F.pandas_udf(returnType='string')
def inner_function(column_arg):
return do_something_with(column_arg, integer_arg)
return inner_function(column_arg)
我是否应该重构我的所有 UDF 以像这样工作?似乎有点矫枉过正。
如果将函数装饰器中 returnType
的处理方式更改为 some_udf
,则可以避免错误。错误源于对函数装饰器的调用:
@F.pandas_udf(returnType='string')
将字符串 'string'
解析为 pyspark 数据类型导致错误。因此,如果您通过直接将数据类型提供为 StringType()
来避免解析字符串,则可以避免该错误。这可以通过修改函数装饰器来完成,如下所示:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
# Avoid the need to parse a 'string' into StringType()
@F.pandas_udf(returnType=StringType())
def some_udf(col):
return col
在我的测试中,在 python 中执行上述代码片段根本不需要 spark 会话。
如果您有多个形式为 F.pandas_udf(returnType='string')
的类似函数调用实例,其中字符串(例如:'string'
、'integer'
)被解析为 pyspark 数据类型,则所有这些实例都需要替换为数据类型对象(例如:StringType()
、IntegerType()
)。
除了 F.pandas_udf
之外,还有其他函数可以将字符串解析为 pyspark 数据类型,并且对所有此类函数的调用也将有助于您看到的 _jvm
AttributeError
。所有此类调用也需要修改以避免字符串解析。
侧边栏:例如,pyspark.sql.SparkSession.createDataFrame
函数也接受一个被解析为 pyspark 数据类型的字符串,尽管在这种情况下您明确需要一个 SparkSession
对象之前你甚至可以调用 createDataFrame
函数。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkShell').getOrCreate()
# 'x INT, y STRING' is parsed to pyspark data types
df = spark.createDataFrame([(x, str(x)) for x in range(5)], 'x INT, y STRING')
df.dtypes
# [('x', 'int'), ('y', 'string')]
理想情况下,F.pandas_udf
也应该 明确地 需要一个 SparkSession
对象,然后它让毫无戒心的用户提供数据类型信息作为 yet-to-be-parsed 字符串(例如:'string'
)但是,遗憾的是,事实并非如此。
我有一个 Python 包,其中包含一个模块,其中包含要在 PySpark 设置中使用的 UDF。我已经想出了一种方法来在 运行 进行单元测试时初始化和关闭 Spark 会话,但是我在创建文档时遇到了问题。我正在使用 Sphinx docs 所以我只是 运行ning make clean docs
和 运行ning 进入这个错误:
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf
return _create_udf(f, returnType, evalType)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 43, in _create_udf
return udf_obj._wrapped()
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 206, in _wrapped
wrapper.returnType = self.returnType
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 96, in returnType
self._returnType_placeholder = _parse_datatype_string(self._returnType)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 843, in _parse_datatype_string
raise e
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 833, in _parse_datatype_string
return from_ddl_schema(s)
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 825, in from_ddl_schema
sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())
AttributeError: 'NoneType' object has no attribute '_jvm'
错误当然是由于试图加载包含定义如下的 UDF 的模块引起的:
import pyspark.sql.functions as F
@F.pandas_udf(returnType='string')
def some_udf(col):
return col
据我了解,问题是无法创建 pandas_udf
,因为当我 运行 make clean docs
时没有可用的 Spark 会话。此库已导入并用于我们的 Databricks 集群,因此 Spark 会话已创建并在加载库时为您存在。我制作了一些其他的 UDF,它们同时接受静态值和列参数:
import pyspark.sql.functions as F
def outer_function(column_arg, integer_arg):
@F.pandas_udf(returnType='string')
def inner_function(column_arg):
return do_something_with(column_arg, integer_arg)
return inner_function(column_arg)
我是否应该重构我的所有 UDF 以像这样工作?似乎有点矫枉过正。
如果将函数装饰器中 returnType
的处理方式更改为 some_udf
,则可以避免错误。错误源于对函数装饰器的调用:
@F.pandas_udf(returnType='string')
将字符串 'string'
解析为 pyspark 数据类型导致错误。因此,如果您通过直接将数据类型提供为 StringType()
来避免解析字符串,则可以避免该错误。这可以通过修改函数装饰器来完成,如下所示:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
# Avoid the need to parse a 'string' into StringType()
@F.pandas_udf(returnType=StringType())
def some_udf(col):
return col
在我的测试中,在 python 中执行上述代码片段根本不需要 spark 会话。
如果您有多个形式为 F.pandas_udf(returnType='string')
的类似函数调用实例,其中字符串(例如:'string'
、'integer'
)被解析为 pyspark 数据类型,则所有这些实例都需要替换为数据类型对象(例如:StringType()
、IntegerType()
)。
除了 F.pandas_udf
之外,还有其他函数可以将字符串解析为 pyspark 数据类型,并且对所有此类函数的调用也将有助于您看到的 _jvm
AttributeError
。所有此类调用也需要修改以避免字符串解析。
侧边栏:例如,pyspark.sql.SparkSession.createDataFrame
函数也接受一个被解析为 pyspark 数据类型的字符串,尽管在这种情况下您明确需要一个 SparkSession
对象之前你甚至可以调用 createDataFrame
函数。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkShell').getOrCreate()
# 'x INT, y STRING' is parsed to pyspark data types
df = spark.createDataFrame([(x, str(x)) for x in range(5)], 'x INT, y STRING')
df.dtypes
# [('x', 'int'), ('y', 'string')]
理想情况下,F.pandas_udf
也应该 明确地 需要一个 SparkSession
对象,然后它让毫无戒心的用户提供数据类型信息作为 yet-to-be-parsed 字符串(例如:'string'
)但是,遗憾的是,事实并非如此。