如何将 boto3 客户端与 Python 多处理一起使用?
How to use boto3 client with Python multiprocessing?
代码看起来像这样:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(s3):
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(s3)
错误:
_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed
我对 Python 多处理库的经验非常有限。我不确定为什么当 S3 客户端不是任何函数中的参数时,它会抛出上述错误。请注意,如果存档文件是从磁盘而不是 S3 加载的,代码能够 运行 没问题。
任何 help/guidance 将不胜感激。
传递给 mp.starmap() 的对象必须是可腌制的,而 S3 客户端不可腌制。将 S3 客户端的操作带到调用 mp.starmap() 的函数之外可以解决问题:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(archive): # pass the previously loaded archive, and not the s3 object into the function
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(archive) # pass the previously loaded archive, and not the s3 object into the function
好吧,我用一种非常直接的方式解决了它。也就是说,使用更简化、更不复杂的对象而不是 .我用了 class Bucket.
但是,您应该考虑以下 post:Can't pickle when using multiprocessing Pool.map()。我将与 boto3 相关的每个对象都放在任何 class 函数之外。其他一些 post 建议将 s3 对象和函数放入您要并行化的函数中以避免开销,不过我还没有尝试过。事实上,我将与您分享一个代码,其中可以将信息保存到 msgpack 文件类型中。
我的代码示例如下(在任何 class 或函数之外)。希望对你有帮助。
import pandas as pd
import boto3
from pathos.pools import ProcessPool
s3 = boto3.resource('s3')
s3_bucket_name = 'bucket-name'
s3_bucket = s3.Bucket(s3_bucket_name)
def msgpack_dump_s3 (df, filename):
try:
s3_bucket.put_object(Body=df.to_msgpack(), Key=filename)
print(module, filename + " successfully saved into s3 bucket '" + s3_bucket.name + "'")
except Exception as e:
# logging all the others as warning
print(module, "Failed deleting bucket. Continuing. {}".format(e))
def msgpack_load_s3 (filename):
try:
return s3_bucket.Object(filename).get()['Body'].read()
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print(module, 'No object found - returning None')
return None
else:
print(module, "Failed deleting bucket. Continuing. {}".format(ex))
raise ex
except Exception as e:
# logging all the others as warning
print(module, "Failed deleting bucket. Continuing. {}".format(e))
return
def upper_function():
def function_to_parallelize(filename):
file = msgpack_load_s3(filename)
if file is not None:
df = pd.read_msgpack(file)
#do somenthing
print('\t\t\tSaving updated info...')
msgpack_dump_s3(df, filename)
pool = ProcessPool(nodes=ncpus)
# do an asynchronous map, then get the results
results = pool.imap(function_to_parallelize, files)
print("...")
print(list(results))
"""
while not results.ready():
time.sleep(5)
print(".", end=' ')
代码看起来像这样:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(s3):
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(s3)
错误:
_pickle.PicklingError: Can't pickle <class 'botocore.client.S3'>: attribute lookup S3 on botocore.client failed
我对 Python 多处理库的经验非常有限。我不确定为什么当 S3 客户端不是任何函数中的参数时,它会抛出上述错误。请注意,如果存档文件是从磁盘而不是 S3 加载的,代码能够 运行 没问题。
任何 help/guidance 将不胜感激。
传递给 mp.starmap() 的对象必须是可腌制的,而 S3 客户端不可腌制。将 S3 客户端的操作带到调用 mp.starmap() 的函数之外可以解决问题:
import multiprocessing as mp
from functools import partial
import boto3
import numpy as np
s3 = boto3.client('s3')
archive = np.load(s3.get_object('some_key')) # Simplified -- details not relevant # Move the s3 call here, outside of the do() function
def _something(**kwargs):
# Some mixed integer programming stuff related to the variable archive
return np.array(some_variable_related_to_archive)
def do(archive): # pass the previously loaded archive, and not the s3 object into the function
pool = mp.pool()
sub_process = partial(_something, slack=0.1)
parts = np.array_split(archive, some_int)
target_parts = np.array(things)
out = pool.starmap(sub_process, [x for x in zip(parts, target_parts)] # Error occurs at this line
pool.close()
pool.join()
do(archive) # pass the previously loaded archive, and not the s3 object into the function
好吧,我用一种非常直接的方式解决了它。也就是说,使用更简化、更不复杂的对象而不是 .我用了 class Bucket.
但是,您应该考虑以下 post:Can't pickle when using multiprocessing Pool.map()。我将与 boto3 相关的每个对象都放在任何 class 函数之外。其他一些 post 建议将 s3 对象和函数放入您要并行化的函数中以避免开销,不过我还没有尝试过。事实上,我将与您分享一个代码,其中可以将信息保存到 msgpack 文件类型中。
我的代码示例如下(在任何 class 或函数之外)。希望对你有帮助。
import pandas as pd
import boto3
from pathos.pools import ProcessPool
s3 = boto3.resource('s3')
s3_bucket_name = 'bucket-name'
s3_bucket = s3.Bucket(s3_bucket_name)
def msgpack_dump_s3 (df, filename):
try:
s3_bucket.put_object(Body=df.to_msgpack(), Key=filename)
print(module, filename + " successfully saved into s3 bucket '" + s3_bucket.name + "'")
except Exception as e:
# logging all the others as warning
print(module, "Failed deleting bucket. Continuing. {}".format(e))
def msgpack_load_s3 (filename):
try:
return s3_bucket.Object(filename).get()['Body'].read()
except ClientError as ex:
if ex.response['Error']['Code'] == 'NoSuchKey':
print(module, 'No object found - returning None')
return None
else:
print(module, "Failed deleting bucket. Continuing. {}".format(ex))
raise ex
except Exception as e:
# logging all the others as warning
print(module, "Failed deleting bucket. Continuing. {}".format(e))
return
def upper_function():
def function_to_parallelize(filename):
file = msgpack_load_s3(filename)
if file is not None:
df = pd.read_msgpack(file)
#do somenthing
print('\t\t\tSaving updated info...')
msgpack_dump_s3(df, filename)
pool = ProcessPool(nodes=ncpus)
# do an asynchronous map, then get the results
results = pool.imap(function_to_parallelize, files)
print("...")
print(list(results))
"""
while not results.ready():
time.sleep(5)
print(".", end=' ')