PySpark 抛出错误方法 __getnewargs__([]) 不存在
PySpark Throwing error Method __getnewargs__([]) does not exist
我有一组文件。文件的路径保存在文件中。例如 all_files.txt
。使用 apache spark,我需要对所有文件进行操作并合并结果。
我想做的步骤是:
- 通过读取
all_files.txt
创建一个 RDD
- 对于
all_files.txt
中的每一行(每一行都是某个文件的路径),
将每个文件的内容读入一个 RDD
- 然后对所有内容做一个操作
这是我为此编写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这是抛出错误:
line 323, in get_return_value py4j.protocol.Py4JError: An error
occurred while calling o25.getnewargs. Trace: py4j.Py4JException:
Method getnewargs([]) does not exist at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:214) at
java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么以及我应该如何进一步处理。提前致谢。
在 flatMap
中使用 spark
或执行器上发生的任何转换是不允许的(spark
会话仅在驱动程序上可用)。也不可能创建 RDD 的 RDD(参见:)
但你可以用另一种方式实现这种转换——将 all_files.txt
的所有内容读入数据帧,使用 local map
使它们成为数据帧和 local reduce
合并所有,见示例:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
我今天遇到这个问题,终于弄清楚我在pandas_udf
中引用了一个spark.DataFrame
对象,导致了这个错误。
结论:
您不能在 udf
和 pandas_udf
中使用 sparkSession
object 、 spark.DataFrame
object 或其他 Spark 分布式对象,因为它们未被 pickled。
如果您在使用 udf
时遇到此错误,请仔细检查,一定是相关问题。
当模型本身是 pyspark.ml.classification
模型时,我尝试使用 mlflow.sklearn.log_model
使用 MLFlow 记录我的模型时也遇到了这个错误。使用 mlflow.spark.log_model
解决了问题。
我有一组文件。文件的路径保存在文件中。例如 all_files.txt
。使用 apache spark,我需要对所有文件进行操作并合并结果。
我想做的步骤是:
- 通过读取
all_files.txt
创建一个 RDD
- 对于
all_files.txt
中的每一行(每一行都是某个文件的路径), 将每个文件的内容读入一个 RDD - 然后对所有内容做一个操作
这是我为此编写的代码:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
这是抛出错误:
line 323, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o25.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)
有人可以告诉我我做错了什么以及我应该如何进一步处理。提前致谢。
在 flatMap
中使用 spark
或执行器上发生的任何转换是不允许的(spark
会话仅在驱动程序上可用)。也不可能创建 RDD 的 RDD(参见:
但你可以用另一种方式实现这种转换——将 all_files.txt
的所有内容读入数据帧,使用 local map
使它们成为数据帧和 local reduce
合并所有,见示例:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
我今天遇到这个问题,终于弄清楚我在pandas_udf
中引用了一个spark.DataFrame
对象,导致了这个错误。
结论:
您不能在 udf
和 pandas_udf
中使用 sparkSession
object 、 spark.DataFrame
object 或其他 Spark 分布式对象,因为它们未被 pickled。
如果您在使用 udf
时遇到此错误,请仔细检查,一定是相关问题。
当模型本身是 pyspark.ml.classification
模型时,我尝试使用 mlflow.sklearn.log_model
使用 MLFlow 记录我的模型时也遇到了这个错误。使用 mlflow.spark.log_model
解决了问题。