使用嵌套多处理有效保存实例属性 Pools/Processes
Effectively save instance attribute with nested multiprocessing Pools/Processes
我有两个自定义 Python classes,第一个有一个方法来进行一些计算(使用 Pool)并创建一个新的实例属性,第二个用于聚合第一个 class 的两个对象,并且有一个方法,我想用它在两个第一个 class 对象中发送所述计算(也是并行的)并正确保存它们的新实例属性。
虚拟代码:
from multiprocessing import Pool, Process
class State:
def __init__(self, data):
self.data = data
def calculate(self):
with Pool() as p:
p.map(function, args)
new_attribute = *some code that reads the files generated with the Pool*
self.new_attribute = new_attribute
return
class Pair:
def __init__(self. state1:State, state2:State):
self.state1 = state1
self.state2 = state2
def calculate_states(self):
for state in [self.state1, self.state2]
p = Process(state.calculate, args)
p.start()
return
state1 = State(data1)
state2 = State(data2)
pair = Pair(state1, state2)
pair.calculate_states()
问题是,正如我在对该问题进行广泛研究期间发现的那样,multiprocessing.Process 创建了进程在其中工作的命名空间的副本,并且这些值没有返回到主命名空间.将 process.daemon 设置为 True 会产生错误,因为“不允许守护进程有子进程”,这与我用额外的池交换进程时发生的事情是一样的。使用多进程(而不是多进程)或 concurrent.futures 似乎也不起作用。此外,我不明白 multiprocessing.Queue 是如何工作的,我不确定它是否可以在这里应用(我在某处读到它可以使用)。
我想做我想做的事,而不必将共享内存对象传递给进程(将 new_attribute 写入其中,然后将其应用于主命名空间中的状态).即使我没有提供有效的 code/reproducible 示例,我也希望有人能指出我的解决方案。
您的问题是由于调用方法 calculate
作为新的子进程引起的。您仍然可以通过使用带有 callback 参数的 map_async
并行计算新属性,而无需这样做。
我已经获取了您的代码并提供了缺少的函数实现来演示:
from multiprocessing import Pool, cpu_count
def some_code(data):
if data == 1:
return 1032
if data == 2:
return 9874
raise ValueError('Invalid data value:', data)
def function(val):
...
# return value is not of interest
class State:
def __init__(self, data):
self.data = data
def calculate(self, pool, args):
pool.map_async(function, args, callback=self.callback)
def callback(self, result):
"""
Called when map_async completes
"""
new_attribute = some_code(self.data)
self.new_attribute = new_attribute
class Pair:
def __init__(self, state1:State, state2:State):
self.state1 = state1
self.state2 = state2
def calculate_states(self):
args = (6, 9, 18)
# Assumption is computation is VERY CPU-intensive
# If there is quite a bit of I/O involved then: pool_size = 2 * len(args)
# If it's mostly I/O you should have been using multithreading to begin with
pool_size = min(2*len(args), cpu_count())
with Pool(pool_size) as pool:
for state in [self.state1, self.state2]:
state.calculate(pool, args)
# wait for tasks to complete
pool.close()
pool.join()
# Required for Windows:
if __name__ == '__main__':
data1 = 1
data2 = 2
state1 = State(data1)
state2 = State(data2)
pair = Pair(state1, state2)
pair.calculate_states()
print(state1.new_attribute, state2.new_attribute)
打印:
1032 9874
我有两个自定义 Python classes,第一个有一个方法来进行一些计算(使用 Pool)并创建一个新的实例属性,第二个用于聚合第一个 class 的两个对象,并且有一个方法,我想用它在两个第一个 class 对象中发送所述计算(也是并行的)并正确保存它们的新实例属性。
虚拟代码:
from multiprocessing import Pool, Process
class State:
def __init__(self, data):
self.data = data
def calculate(self):
with Pool() as p:
p.map(function, args)
new_attribute = *some code that reads the files generated with the Pool*
self.new_attribute = new_attribute
return
class Pair:
def __init__(self. state1:State, state2:State):
self.state1 = state1
self.state2 = state2
def calculate_states(self):
for state in [self.state1, self.state2]
p = Process(state.calculate, args)
p.start()
return
state1 = State(data1)
state2 = State(data2)
pair = Pair(state1, state2)
pair.calculate_states()
问题是,正如我在对该问题进行广泛研究期间发现的那样,multiprocessing.Process 创建了进程在其中工作的命名空间的副本,并且这些值没有返回到主命名空间.将 process.daemon 设置为 True 会产生错误,因为“不允许守护进程有子进程”,这与我用额外的池交换进程时发生的事情是一样的。使用多进程(而不是多进程)或 concurrent.futures 似乎也不起作用。此外,我不明白 multiprocessing.Queue 是如何工作的,我不确定它是否可以在这里应用(我在某处读到它可以使用)。
我想做我想做的事,而不必将共享内存对象传递给进程(将 new_attribute 写入其中,然后将其应用于主命名空间中的状态).即使我没有提供有效的 code/reproducible 示例,我也希望有人能指出我的解决方案。
您的问题是由于调用方法 calculate
作为新的子进程引起的。您仍然可以通过使用带有 callback 参数的 map_async
并行计算新属性,而无需这样做。
我已经获取了您的代码并提供了缺少的函数实现来演示:
from multiprocessing import Pool, cpu_count
def some_code(data):
if data == 1:
return 1032
if data == 2:
return 9874
raise ValueError('Invalid data value:', data)
def function(val):
...
# return value is not of interest
class State:
def __init__(self, data):
self.data = data
def calculate(self, pool, args):
pool.map_async(function, args, callback=self.callback)
def callback(self, result):
"""
Called when map_async completes
"""
new_attribute = some_code(self.data)
self.new_attribute = new_attribute
class Pair:
def __init__(self, state1:State, state2:State):
self.state1 = state1
self.state2 = state2
def calculate_states(self):
args = (6, 9, 18)
# Assumption is computation is VERY CPU-intensive
# If there is quite a bit of I/O involved then: pool_size = 2 * len(args)
# If it's mostly I/O you should have been using multithreading to begin with
pool_size = min(2*len(args), cpu_count())
with Pool(pool_size) as pool:
for state in [self.state1, self.state2]:
state.calculate(pool, args)
# wait for tasks to complete
pool.close()
pool.join()
# Required for Windows:
if __name__ == '__main__':
data1 = 1
data2 = 2
state1 = State(data1)
state2 = State(data2)
pair = Pair(state1, state2)
pair.calculate_states()
print(state1.new_attribute, state2.new_attribute)
打印:
1032 9874