Python 多处理池 apply_async 错误
Python multiprocessing pool apply_async error
我正在尝试评估多处理池中的多个进程,但一直 运行 出错,我无法弄清楚为什么......下面是代码的简化版本:
class Object_1():
def add_godd_spd_column()
def calculate_correlations(arg1, arg2, arg3):
return {'a': 1}
processes = {}
pool = Pool(processes=6)
for i in range(1, 10):
processes[i] = pool.apply_async(calculate_correlations,
args=(arg1, arg2, arg3,))
correlations = {}
for i in range(0, 10):
correlations[i] = processes[i].get()
此returns以下错误:
Traceback (most recent call last):
File "./02_results.py", line 116, in <module>
correlations[0] = processes[0].get()
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 385, in
_handle_tasks
put(task)
File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/opt/anaconda3/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'SCADA.add_good_spd_column.<locals>.calculate_correlations
当我调用以下内容时:
相关[0].成功()
我收到以下错误:
Traceback (most recent call last):
File "./02_results.py", line 116, in <module>
print(processes[0].successful())
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 595, in
successful
assert self.ready()
AssertionError
这是因为在调用 .get() 之前进程实际上并未完成吗?正在评估的函数只是 returns 一个绝对应该可以 pickle-able 的字典...
干杯,
错误发生是因为 pickling a function nested in another function is not supported,multiprocessing.Pool
需要 pickle 您作为参数传递给 apply_async
的函数,以便在工作进程中执行它。您必须将该函数移动到模块的顶层,或者使其成为 class 的实例方法。请记住,如果将其设为实例方法,class 本身的实例也必须是可腌制的。
是的,调用 successful()
时出现断言错误是因为您在结果准备好之前调用它。 From the docs:
successful()
Return whether the call completed without raising an exception. Will raise AssertionError
if the result is not ready.
我正在尝试评估多处理池中的多个进程,但一直 运行 出错,我无法弄清楚为什么......下面是代码的简化版本:
class Object_1():
def add_godd_spd_column()
def calculate_correlations(arg1, arg2, arg3):
return {'a': 1}
processes = {}
pool = Pool(processes=6)
for i in range(1, 10):
processes[i] = pool.apply_async(calculate_correlations,
args=(arg1, arg2, arg3,))
correlations = {}
for i in range(0, 10):
correlations[i] = processes[i].get()
此returns以下错误:
Traceback (most recent call last):
File "./02_results.py", line 116, in <module>
correlations[0] = processes[0].get()
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 608, in get
raise self._value
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 385, in
_handle_tasks
put(task)
File "/opt/anaconda3/lib/python3.5/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/opt/anaconda3/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'SCADA.add_good_spd_column.<locals>.calculate_correlations
当我调用以下内容时: 相关[0].成功() 我收到以下错误:
Traceback (most recent call last):
File "./02_results.py", line 116, in <module>
print(processes[0].successful())
File "/opt/anaconda3/lib/python3.5/multiprocessing/pool.py", line 595, in
successful
assert self.ready()
AssertionError
这是因为在调用 .get() 之前进程实际上并未完成吗?正在评估的函数只是 returns 一个绝对应该可以 pickle-able 的字典...
干杯,
错误发生是因为 pickling a function nested in another function is not supported,multiprocessing.Pool
需要 pickle 您作为参数传递给 apply_async
的函数,以便在工作进程中执行它。您必须将该函数移动到模块的顶层,或者使其成为 class 的实例方法。请记住,如果将其设为实例方法,class 本身的实例也必须是可腌制的。
是的,调用 successful()
时出现断言错误是因为您在结果准备好之前调用它。 From the docs:
successful()
Return whether the call completed without raising an exception. Will raise
AssertionError
if the result is not ready.