当用户定义的对象从 Pyspark 中的映射操作返回时,会发生内置的属性查找代码失败错误

Attribute lookup Code on builtins failed error happens when a user defined object returned from map operation in Pyspark

我在 jupyterLab 中编写了以下 Pyspark 代码:

class Base:
    def __init__(self,line):
        self.line = line

def process_line(line):

    return Base(line)
input_path = 'the_path_of_input'
samples = sc.textFile(input_path).map(process_line)
print(samples.take(1))

我执行上面的代码时遇到了错误。以下是错误信息:

_pickle.PicklingError: Can't pickle <class 'Base'>: attribute lookup Base on builtins failed

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
    at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

想了各种方法都试了,还是报错

根据此处的回答和评论:Python: Can't pickle type X, attribute lookup failed

看来你需要让 Base class 成为它自己的模块,然后它才能工作。 所以这里是 Base.py

class Base:
    def __init__(self,line):
        self.line = line

mainscript.py

from pyspark import SparkContext, SQLContext
from Base import Base

sc = SparkContext('local')
sqlContext = SQLContext(sc)

def process_line_baseclass(line):
    return Base(line)


input_path = 'path/to/inputfile'
samples = sc.textFile(input_path).map(process_line_baseclass)

print(samples.take(1))

输出:

[<Base.Base object at 0x7fc2188e64a8>]