无法在没有 Spark 会话的情况下在 PySpark 项目中构建文档 运行

Can't build docs in PySpark project without Spark session running

我有一个 Python 包,其中包含一个模块,其中包含要在 PySpark 设置中使用的 UDF。我已经想出了一种方法来在 运行 进行单元测试时初始化和关闭 Spark 会话,但是我在创建文档时遇到了问题。我正在使用 Sphinx docs 所以我只是 运行ning make clean docs 和 运行ning 进入这个错误:

  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/pandas/functions.py", line 432, in _create_pandas_udf
    return _create_udf(f, returnType, evalType)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 43, in _create_udf
    return udf_obj._wrapped()
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 206, in _wrapped
    wrapper.returnType = self.returnType
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/udf.py", line 96, in returnType
    self._returnType_placeholder = _parse_datatype_string(self._returnType)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 843, in _parse_datatype_string
    raise e
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 833, in _parse_datatype_string
    return from_ddl_schema(s)
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/types.py", line 825, in from_ddl_schema
    sc._jvm.org.apache.spark.sql.types.StructType.fromDDL(type_str).json())
AttributeError: 'NoneType' object has no attribute '_jvm'

错误当然是由于试图加载包含定义如下的 UDF 的模块引起的:

import pyspark.sql.functions as F

@F.pandas_udf(returnType='string')
def some_udf(col):
    return col

据我了解,问题是无法创建 pandas_udf,因为当我 运行 make clean docs 时没有可用的 Spark 会话。此库已导入并用于我们的 Databricks 集群,因此 Spark 会话已创建并在加载库时为您存在。我制作了一些其他的 UDF,它们同时接受静态值和列参数:

import pyspark.sql.functions as F

def outer_function(column_arg, integer_arg):

    @F.pandas_udf(returnType='string')
    def inner_function(column_arg):
        return do_something_with(column_arg, integer_arg)

    return inner_function(column_arg)

我是否应该重构我的所有 UDF 以像这样工作?似乎有点矫枉过正。

如果将函数装饰器中 returnType 的处理方式更改为 some_udf,则可以避免错误。错误源于对函数装饰器的调用:

@F.pandas_udf(returnType='string')

将字符串 'string' 解析为 pyspark 数据类型导致错误。因此,如果您通过直接将数据类型提供为 StringType() 来避免解析字符串,则可以避免该错误。这可以通过修改函数装饰器来完成,如下所示:

import pyspark.sql.functions as F
from pyspark.sql.types import StringType

# Avoid the need to parse a 'string' into StringType()
@F.pandas_udf(returnType=StringType())
def some_udf(col):
    return col

在我的测试中,在 python 中执行上述代码片段根本不需要 spark 会话。

如果您有多个形式为 F.pandas_udf(returnType='string') 的类似函数调用实例,其中字符串(例如:'string''integer')被解析为 pyspark 数据类型,则所有这些实例都需要替换为数据类型对象(例如:StringType()IntegerType())。

除了 F.pandas_udf 之外,还有其他函数可以将字符串解析为 pyspark 数据类型,并且对所有此类函数的调用也将有助于您看到的 _jvm AttributeError。所有此类调用也需要修改以避免字符串解析。

侧边栏:例如,pyspark.sql.SparkSession.createDataFrame 函数也接受一个被解析为 pyspark 数据类型的字符串,尽管在这种情况下您明确需要一个 SparkSession 对象之前你甚至可以调用 createDataFrame 函数。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySparkShell').getOrCreate()

# 'x INT, y STRING' is parsed to pyspark data types
df = spark.createDataFrame([(x, str(x)) for x in range(5)], 'x INT, y STRING')  

df.dtypes
# [('x', 'int'), ('y', 'string')]

理想情况下,F.pandas_udf 也应该 明确地 需要一个 SparkSession 对象,然后它让毫无戒心的用户提供数据类型信息作为 yet-to-be-parsed 字符串(例如:'string')但是,遗憾的是,事实并非如此。