尝试使用 boto3 将 numpy 数组上传到 s3 时出现 Pyspark 酸洗错误

Pyspark pickling error when trying to upload numpy arrays to s3 using boto3

我正在尝试使用 pyspark 应用程序中的 boto3 客户端将我的 numpy 数组上传到 s3,但它给我 pickling 错误消息。下面是我的代码。

def write_features3(model,key,obj,output_path, format_name):
    try:
        LOGGER.info('executing vgg16 feature extractor...')
        img = image.load_img(BytesIO(obj), target_size=(224, 224,3))
        img_data = image.img_to_array(img)
        img_data = np.expand_dims(img_data, axis=0)
        img_data = preprocess_input(img_data)
        vgg16_feature = model.predict(img_data)[0]
        LOGGER.info('++++++++++++++++++++++++++++',vgg16_feature.shape)






        file_name_without_ext = get_file_name_without_ext(key)
        rest_of_path = OUTPUT.split('/', 1)[1]
        s3_full_path = rest_of_path + '/' + file_name_without_ext + '.' + '.npy'
        LOGGER.info("Saving to S3....")
        feature_dir = '/home/hadoop'
        s3 = boto3.client('s3', region_name='us-east-1')
        local_dir_full_path = feature_dir + '/' + file_name_without_ext + '.npy'
        np.save(local_dir_full_path, vgg16_feature)
        s3.upload_file(local_dir_full_path, 'test', s3_full_path)
        os.remove(local_dir_full_path)
    except Exception as e:
        print('Error......{}'.format(e.args))
        return []

def write_features_(xs):
    model_data = initVGG16()

    for k, v in xs:
        yield k, write_features3(model_data, k,v,OUTPUT, FORMAT_NAME)

driver program:-
s3_files_rdd = sc.binaryFiles('s3n://....')
features_rdd = s3_files_rdd.foreachPartition(write_features_)

当我尝试运行此程序时,出现以下错误。甚至我尝试将 s3 客户端放入 write_features_ 分区方法中,但没有成功。同样的错误。 火花版本 - 2.2.1

错误:-

n save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 600, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: can't pickle thread.lock objects
Traceback (most recent call last):
  File "six_file_boto3_write1.py", line 249, in <module>
    run()
  File "six_file_boto3_write1.py", line 227, in run
    s3_files_rdd.foreachPartition(write_features_)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 799, in foreachPartition
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1041, in count
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 1032, in sum
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 906, in fold
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 809, in collect
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2455, in _jrdd
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2388, in _wrap_function
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/rdd.py", line 2374, in _prepare_for_python_RDD
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/serializers.py", line 464, in dumps
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 704, in dumps
  File "/mnt/yarn/usercache/hadoop/appcache/application_1541683970451_0004/container_1541683970451_0004_01_000001/pyspark.zip/pyspark/cloudpickle.py", line 162, in dump
pickle.PicklingError: Could not serialize object: TypeError: can't pickle thread.lock objects

问题出在spark版本上,我用的是spark-2.2.1。现在我升级到 spark-2.3.2 ,一切开始正常工作。