火花过滤器上的泡菜错误

pickle error on spark filter

当我使用引用对象的闭包过滤 RDD 时,出现 pickle 错误。

没有对象:

>>> a
MapPartitionsRDD[369] at mapPartitions at SerDeUtil.scala:143
>>> b = a.filter(lambda row: row.foo == 1)
>>> b
PythonRDD[374] at RDD at PythonRDD.scala:43

对象:

>>> z.foo
1
>>> b = a.filter(lambda row: row.foo == z.foo)
>>> type(b)
<class 'pyspark.rdd.PipelinedRDD'>
>>> b
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 142, in __repr__
    return self._jrdd.toString()
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/rdd.py", line 2107, in _jrdd
    pickled_command = ser.dumps(command)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/serializers.py", line 402, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 816, in dumps
    cp.dump(obj)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 133, in dump
    return pickle.Pickler.dump(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 562, in save_tuple
    save(element)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.6/pickle.py", line 633, in _batch_appends
    save(x)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 254, in save_function
    self.save_function_tuple(obj, [themodule])
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 304, in save_function_tuple
    save((code, closure, base_globals))
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj, modList)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 686, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 616, in save_reduce
    save(cls)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 467, in save_global
    d),obj=obj)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 631, in save_reduce
    save(args)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 548, in save_tuple
    save(element)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 249, in save_function
    self.save_function_tuple(obj, modList)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 309, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.6/pickle.py", line 600, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.6/pickle.py", line 636, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 547, in save_inst
    self.save_inst_logic(obj)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 537, in save_inst_logic
    save(stuff)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 650, in save_reduce
    save(state)
  File "/usr/lib64/python2.6/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/spark/python/pyspark/cloudpickle.py", line 174, in save_dict
    pickle.Pickler.save_dict(self, obj)
  File "/usr/lib64/python2.6/pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.6/pickle.py", line 681, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.6/pickle.py", line 313, in save
    (t.__name__, obj))
pickle.PicklingError: Can't pickle 'lock' object: <thread.lock object at 0x7f5f92cc64c8>

我做错了什么?

z 还包含哪些其他字段?序列化闭包时,传入的是对象,而不仅仅是您正在访问的字段。如果任何其他字段引用不可序列化的实体(例如:Spark Context 对象),您将收到序列化错误。