当用户定义的对象从 Pyspark 中的映射操作返回时,会发生内置的属性查找代码失败错误
Attribute lookup Code on builtins failed error happens when a user defined object returned from map operation in Pyspark
我在 jupyterLab 中编写了以下 Pyspark 代码:
class Base:
def __init__(self,line):
self.line = line
def process_line(line):
return Base(line)
input_path = 'the_path_of_input'
samples = sc.textFile(input_path).map(process_line)
print(samples.take(1))
我执行上面的代码时遇到了错误。以下是错误信息:
_pickle.PicklingError: Can't pickle <class 'Base'>: attribute lookup Base on builtins failed
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
想了各种方法都试了,还是报错
根据此处的回答和评论:Python: Can't pickle type X, attribute lookup failed
看来你需要让 Base class 成为它自己的模块,然后它才能工作。
所以这里是 Base.py
class Base:
def __init__(self,line):
self.line = line
和mainscript.py
from pyspark import SparkContext, SQLContext
from Base import Base
sc = SparkContext('local')
sqlContext = SQLContext(sc)
def process_line_baseclass(line):
return Base(line)
input_path = 'path/to/inputfile'
samples = sc.textFile(input_path).map(process_line_baseclass)
print(samples.take(1))
输出:
[<Base.Base object at 0x7fc2188e64a8>]
我在 jupyterLab 中编写了以下 Pyspark 代码:
class Base:
def __init__(self,line):
self.line = line
def process_line(line):
return Base(line)
input_path = 'the_path_of_input'
samples = sc.textFile(input_path).map(process_line)
print(samples.take(1))
我执行上面的代码时遇到了错误。以下是错误信息:
_pickle.PicklingError: Can't pickle <class 'Base'>: attribute lookup Base on builtins failed
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
at org.apache.spark.api.python.PythonRDD$$anonfun.apply(PythonRDD.scala:153)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2157)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
想了各种方法都试了,还是报错
根据此处的回答和评论:Python: Can't pickle type X, attribute lookup failed
看来你需要让 Base class 成为它自己的模块,然后它才能工作。
所以这里是 Base.py
class Base:
def __init__(self,line):
self.line = line
和mainscript.py
from pyspark import SparkContext, SQLContext
from Base import Base
sc = SparkContext('local')
sqlContext = SQLContext(sc)
def process_line_baseclass(line):
return Base(line)
input_path = 'path/to/inputfile'
samples = sc.textFile(input_path).map(process_line_baseclass)
print(samples.take(1))
输出:
[<Base.Base object at 0x7fc2188e64a8>]