无法将动态 类 与 concurrent.futures.ProcessPoolExecutor 一起使用
Unable to use dynamic classes with concurrent.futures.ProcessPoolExecutor
在下面的代码中,我使用 generate_object
方法在 _py
属性中动态创建了 class 的对象。
如果我不使用并发方法,代码可以完美运行。但是,如果我使用 concurrent.futures
的并发,我不会得到想要的结果,因为出现错误提示(除其他外):
_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed
在谷歌搜索这个错误后,我了解到在 ProcessPoolExecutor.map()
中只有可腌制的对象将作为参数传递,所以我决定看看如何将我的动态 class 变成可腌制的。
问题是该问题的所有其他解决方案都以不同的方式创建动态对象(与我在 _string_to_object()
中使用的不同)。示例: and 2
我非常想保持动态对象的创建方式,因为我的很多实际代码都是基于它的,因此我正在寻找一个与下面这个玩具代码一起工作的并发解决方案。
代码
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
def concurrent_function(args):
first_A = args[0]
second_A = args[1]
first_A.print_numbers()
second_A.print_numbers()
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
results = executor.map(concurrent_function, args)
我想不出一种方法来严格按照您当前的方案在全局名称 space 中创建 Script
classes。然而:
既然每次调用方法 generate_object
都会在本地名称 space 中创建一个新的 class 并实例化该 class 的对象,为什么不推迟在进程池中完成它的工作?这还有一个额外的好处,即可以并行执行此 class-creation 处理,并且不需要酸洗。我们现在将两个整数参数 number_1
和 number_2
:
传递给 concurrent_function
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
"""
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""
def concurrent_function(args):
for arg in args:
obj = A().generate_object(arg[0], arg[1])
obj.print_numbers()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
打印:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
更高效的方法
不需要使用exec
。而是使用闭包:
from concurrent.futures import ProcessPoolExecutor
def make_print_function(number_1, number_2):
def print_numbers():
print(f'Numbers = {number_1} and {number_2}')
return print_numbers
def concurrent_function(args):
for arg in args:
fn = make_print_function(arg[0], arg[1])
fn()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
打印:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
使用对象缓存避免不必要地创建新对象
obj_cache = {} # each process will have its own
def concurrent_function(args):
for arg in args:
# was an object created with this set of arguments: (arg[0], arg[1])?
obj = obj_cache.get(arg)
if obj is None: # must create new object
obj = A().generate_object(arg[0], arg[1])
obj_cache[arg] = obj # save object for possible future use
obj.print_numbers()
可能我找到了一种无需 exec()
函数即可执行此操作的方法。实施(带评论)如下。
import codecs
from concurrent.futures import ProcessPoolExecutor
class A:
def __init__(self):
self.py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, number_1, number_2):
py = self.py.format(number_1, number_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
class_code = self.generate_text(number_1, number_2)
# Create file in disk
with open("Script_" + str(number_1) + "_" + str(number_2) + ".py", "w") as file:
file.write(class_code)
# Now import it and the class will now be (correctly) seen in __main__
package = "Script_" + str(number_1) + "_" + str(number_2)
class_name = "Script_" + str(number_1) + "_" + str(number_2)
# This is the programmatically version of
# from <package> import <class_name>
class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
return class_name()
def concurrent_function(args):
first_A = args[0]
second_A = args[1]
first_A.print_numbers()
second_A.print_numbers()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 2
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
results = executor.map(concurrent_function, args)
if __name__ == '__main__':
main()
基本上我所做的不是动态分配 class,而是将其写入文件。我这样做是因为我遇到的问题的根源是 pickle 在查看全局范围时无法正确定位嵌套的 class 。现在我以编程方式导入 class(将其保存到文件后)。
当然,这个方案也有处理文件的瓶颈,成本也很高。我没有测量处理文件或 exec
是否更快,但在我的 real-world 情况下,我只需要合成 class 的一个对象(而不是像玩具中那样每次并行调用一个对象提供的代码),因此文件选项最适合我。
还有一个问题:在使用n_calls = 15
(例如)并执行多次后,有时似乎无法导入模块(刚刚创建的文件)。我试图在以编程方式导入它之前放置一个 sleep()
但它没有帮助。使用少量调用时似乎不会发生此问题,而且似乎也是随机发生的。部分错误堆栈的示例如下所示:
Traceback (most recent call last):
File "main.py", line 45, in <module>
main()
File "main.py", line 42, in main
results = executor.map(concurrent_function, args)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 674, in map
results = super().map(partial(_process_chunk, fn),
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib/python3.8/concurrent/futures/process.py", line 184, in _get_chunks
chunk = tuple(itertools.islice(it, chunksize))
File "main.py", line 41, in <genexpr>
args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
File "main.py", line 26, in generate_object
class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
ModuleNotFoundError: No module named 'Script_13_14'
在下面的代码中,我使用 generate_object
方法在 _py
属性中动态创建了 class 的对象。
如果我不使用并发方法,代码可以完美运行。但是,如果我使用 concurrent.futures
的并发,我不会得到想要的结果,因为出现错误提示(除其他外):
_pickle.PicklingError: Can't pickle <class '__main__.Script_0_1'>: attribute lookup Script_0_1 on __main__ failed
在谷歌搜索这个错误后,我了解到在 ProcessPoolExecutor.map()
中只有可腌制的对象将作为参数传递,所以我决定看看如何将我的动态 class 变成可腌制的。
问题是该问题的所有其他解决方案都以不同的方式创建动态对象(与我在 _string_to_object()
中使用的不同)。示例:
我非常想保持动态对象的创建方式,因为我的很多实际代码都是基于它的,因此我正在寻找一个与下面这个玩具代码一起工作的并发解决方案。
代码
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
def concurrent_function(args):
first_A = args[0]
second_A = args[1]
first_A.print_numbers()
second_A.print_numbers()
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( (A().generate_object(i, i+1), A().generate_object(i+1, i+2)) for i in range(n_calls))
results = executor.map(concurrent_function, args)
我想不出一种方法来严格按照您当前的方案在全局名称 space 中创建 Script
classes。然而:
既然每次调用方法 generate_object
都会在本地名称 space 中创建一个新的 class 并实例化该 class 的对象,为什么不推迟在进程池中完成它的工作?这还有一个额外的好处,即可以并行执行此 class-creation 处理,并且不需要酸洗。我们现在将两个整数参数 number_1
和 number_2
:
concurrent_function
import random
import codecs
import re
from concurrent.futures import ProcessPoolExecutor
class A:
def __init__(self):
self._py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, name_1, name_2):
py = self._py.format(name_1, name_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
""" Generate an object of the class inside the string self._py """
return self._string_to_object(self.generate_text(number_1, number_2))
def _string_to_object(self, str_class, *args, **kwargs):
""" Transform a program written inside str_class to an object. """
exec(str_class)
class_name = re.search("class (.*):", str_class).group(1).partition("(")[0]
return locals()[class_name](*args, **kwargs)
"""
from functools import partial
print('Single usage')
a = A()
script = a.generate_object(1, 2)
script.print_numbers()
"""
def concurrent_function(args):
for arg in args:
obj = A().generate_object(arg[0], arg[1])
obj.print_numbers()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
打印:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
更高效的方法
不需要使用exec
。而是使用闭包:
from concurrent.futures import ProcessPoolExecutor
def make_print_function(number_1, number_2):
def print_numbers():
print(f'Numbers = {number_1} and {number_2}')
return print_numbers
def concurrent_function(args):
for arg in args:
fn = make_print_function(arg[0], arg[1])
fn()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 3
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( ((i, i+1), (i+1, i+2)) for i in range(n_calls))
# wait for completion of all tasks:
results = list(executor.map(concurrent_function, args))
if __name__ == '__main__':
main()
打印:
Multiprocessing usage
Numbers = 0 and 1
Numbers = 1 and 2
Numbers = 1 and 2
Numbers = 2 and 3
Numbers = 2 and 3
Numbers = 3 and 4
使用对象缓存避免不必要地创建新对象
obj_cache = {} # each process will have its own
def concurrent_function(args):
for arg in args:
# was an object created with this set of arguments: (arg[0], arg[1])?
obj = obj_cache.get(arg)
if obj is None: # must create new object
obj = A().generate_object(arg[0], arg[1])
obj_cache[arg] = obj # save object for possible future use
obj.print_numbers()
可能我找到了一种无需 exec()
函数即可执行此操作的方法。实施(带评论)如下。
import codecs
from concurrent.futures import ProcessPoolExecutor
class A:
def __init__(self):
self.py = r'''
class Script_{0}_{1}:
\tdef print_numbers(self):
\t\tprint('Numbers = ', {0}, 'and', {1})
'''
def generate_text(self, number_1, number_2):
py = self.py.format(number_1, number_2)
py = codecs.decode(py, 'unicode_escape')
return py
def generate_object(self, number_1, number_2):
class_code = self.generate_text(number_1, number_2)
# Create file in disk
with open("Script_" + str(number_1) + "_" + str(number_2) + ".py", "w") as file:
file.write(class_code)
# Now import it and the class will now be (correctly) seen in __main__
package = "Script_" + str(number_1) + "_" + str(number_2)
class_name = "Script_" + str(number_1) + "_" + str(number_2)
# This is the programmatically version of
# from <package> import <class_name>
class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
return class_name()
def concurrent_function(args):
first_A = args[0]
second_A = args[1]
first_A.print_numbers()
second_A.print_numbers()
def main():
print('Multiprocessing usage')
n_cores = 3
n_calls = 2
with ProcessPoolExecutor(max_workers=n_cores) as executor:
args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
results = executor.map(concurrent_function, args)
if __name__ == '__main__':
main()
基本上我所做的不是动态分配 class,而是将其写入文件。我这样做是因为我遇到的问题的根源是 pickle 在查看全局范围时无法正确定位嵌套的 class 。现在我以编程方式导入 class(将其保存到文件后)。
当然,这个方案也有处理文件的瓶颈,成本也很高。我没有测量处理文件或 exec
是否更快,但在我的 real-world 情况下,我只需要合成 class 的一个对象(而不是像玩具中那样每次并行调用一个对象提供的代码),因此文件选项最适合我。
还有一个问题:在使用n_calls = 15
(例如)并执行多次后,有时似乎无法导入模块(刚刚创建的文件)。我试图在以编程方式导入它之前放置一个 sleep()
但它没有帮助。使用少量调用时似乎不会发生此问题,而且似乎也是随机发生的。部分错误堆栈的示例如下所示:
Traceback (most recent call last):
File "main.py", line 45, in <module>
main()
File "main.py", line 42, in main
results = executor.map(concurrent_function, args)
File "/usr/lib/python3.8/concurrent/futures/process.py", line 674, in map
results = super().map(partial(_process_chunk, fn),
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 600, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/lib/python3.8/concurrent/futures/process.py", line 184, in _get_chunks
chunk = tuple(itertools.islice(it, chunksize))
File "main.py", line 41, in <genexpr>
args = ( (A().generate_object(i, i+1), A().generate_object(i+2, i+3)) for i in range(n_calls))
File "main.py", line 26, in generate_object
class_name = getattr(__import__(package, fromlist=[class_name]), class_name)
ModuleNotFoundError: No module named 'Script_13_14'