如何将 pyspark UDF 导入 main class
How to import pyspark UDF into main class
我有两个文件。 functions.py
有一个函数并从该函数创建一个 pyspark udf。 main.py
尝试导入 udf。但是,main.py
似乎无法访问 functions.py
中的函数。
functions.py:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def do_something(x):
return x + 'hello'
sample_udf = udf(lambda x: do_something(x), StringType())
main.py:
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
这会导致错误:
17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'do_something'
如果我绕过 do_something
函数并将它放在 udf 中,例如:udf(lambda x: x + ' hello', StringType())
,UDF 导入很好 - 但我的函数有点长,如果有它封装在一个单独的函数中。实现此目标的正确方法是什么?
只需将此添加为答案:-
将您的 py 文件添加到 sparkcontext,以便您的执行者可以使用它。
sc.addPyFile("functions.py")
from functions import sample_udf
这是我的测试笔记本
谢谢,
查尔斯.
我认为更简洁的解决方案是使用 udf 装饰器来定义您的 udf 函数:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
@F.udf
def sample_udf(x):
return x + 'hello'
使用此解决方案,udf 不引用任何其他函数,您不需要在主代码中使用 sc.addPyFile
。
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
# It works :)
对于某些旧版本的 spark,装饰器不支持类型化的 udf,您可能必须按如下方式定义自定义装饰器:
import pyspark.sql.functions as F
import pyspark.sql.types as t
# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
def _typed_udf_wrapper(func):
return F.udf(func, returntype)
return _typed_udf_wrapper
@udf_typed(t.IntegerType())
def my_udf(x)
return int(x)
我有两个文件。 functions.py
有一个函数并从该函数创建一个 pyspark udf。 main.py
尝试导入 udf。但是,main.py
似乎无法访问 functions.py
中的函数。
functions.py:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def do_something(x):
return x + 'hello'
sample_udf = udf(lambda x: do_something(x), StringType())
main.py:
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
这会导致错误:
17/10/03 19:35:29 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 6, ip-10-223-181-5.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/worker.py", line 164, in main
func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 93, in read_udfs
arg_offsets, udf = read_single_udf(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 79, in read_single_udf
f, return_type = read_command(pickleSer, infile)
File "/usr/lib/spark/python/pyspark/worker.py", line 55, in read_command
command = serializer._read_with_length(file)
File "/usr/lib/spark/python/pyspark/serializers.py", line 169, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/pyspark/serializers.py", line 454, in loads
return pickle.loads(obj)
AttributeError: 'module' object has no attribute 'do_something'
如果我绕过 do_something
函数并将它放在 udf 中,例如:udf(lambda x: x + ' hello', StringType())
,UDF 导入很好 - 但我的函数有点长,如果有它封装在一个单独的函数中。实现此目标的正确方法是什么?
只需将此添加为答案:-
将您的 py 文件添加到 sparkcontext,以便您的执行者可以使用它。
sc.addPyFile("functions.py")
from functions import sample_udf
这是我的测试笔记本
谢谢, 查尔斯.
我认为更简洁的解决方案是使用 udf 装饰器来定义您的 udf 函数:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
@F.udf
def sample_udf(x):
return x + 'hello'
使用此解决方案,udf 不引用任何其他函数,您不需要在主代码中使用 sc.addPyFile
。
from functions import sample_udf, do_something
df = spark.read.load(file)
df.withColumn("sample",sample_udf(col("text")))
# It works :)
对于某些旧版本的 spark,装饰器不支持类型化的 udf,您可能必须按如下方式定义自定义装饰器:
import pyspark.sql.functions as F
import pyspark.sql.types as t
# Custom udf decorator which accept return type
def udf_typed(returntype=t.StringType()):
def _typed_udf_wrapper(func):
return F.udf(func, returntype)
return _typed_udf_wrapper
@udf_typed(t.IntegerType())
def my_udf(x)
return int(x)