Python multiprocessing 合并来自多个进程的字典
Python multiprocessing merge dictionaries of dictionaries from multiple processes
我正在尝试跨多个进程使用共享内存来更新包含字典的字典...我尝试使用多处理模块中的管理器,但我很难向它添加字典。请参阅下面的代码和评论。本质上,这段代码应该在另一个名为 "output." 的字典中创建输入的副本一旦我开始工作,就会有逻辑只从输入中复制某些 "blades",但是 node/cluster/blade 必须保持层次结构。
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
def iterate_over_clusters_in_node(input, node, lock, output):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
print "node: " + node + ", node_IP: " + input[node]['IP'] + ", cluster: " + cluster + ", Blade: " + blade + ", cluster_IP: " + input[node]['clusters'][cluster]['blades'][blade]
with lock:
add_blade_to_output(input, node, cluster, blade, output)
def add_blade_to_output(input, node, cluster, blade, output):
''' Adds a blade to the managed output dictionary'''
if node not in output:
output[node] = {}
output[node]['IP'] = input[node]['IP']
output[node]['clusters'] = {}
# At this point, I would expect output[node]['IP'] and output[node]['clusters'] to exist
# But the following print raises KeyError: 'IP'
print output[node]['IP']
if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
output[node]['clusters'][cluster] = {}
output[node]['clusters'][cluster]['blades'] = {}
output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock, output))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
# Expectation: should print the same dictionary as the input
# Actual: prints "{'Node_2': {}, 'Node_1': {}}"
我是否需要将 manager.dict() 添加到输出 [node] 而不是内置字典类型?还是我做错了?
谢谢!
编辑:我不反对将其切换为 "threading" 实现而不是 "multiprocessing." 我是 运行 并行事物的新手,所以如果线程更适合这个内存共享的类型,请告诉我。
编辑:这是工作代码:
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
def iterate_over_clusters_in_node(input, node, lock):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
add_blade_to_output(input, node, cluster, blade)
def add_blade_to_output(input, node, cluster, blade):
''' Adds a blade to the managed output dictionary'''
if node not in output:
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
new_node = {}
new_node.update(output)
if cluster not in output[node]['clusters']:
new_node[node]['clusters'][cluster] = {}
new_node[node]['clusters'][cluster]['blades'] = {blade: input[node]['clusters'][cluster]['blades'][blade]}
else:
new_node[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(new_node)
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
根据 python 文档,
Modifications to mutable values or items in dict and list proxies will
not be propagated through the manager, because the proxy has no way of
knowing when its values or items are modified. To modify such an item,
you can re-assign the modified object to the container proxy.
根据这些信息,我们可以相应地更新管理器:
#output[node] = {}
#output[node]['IP'] = input[node]['IP']
#output[node]['clusters'] = {} These changes are not propagated through the manager
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
#if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
#output[node]['clusters'][cluster] = {}
#output[node]['clusters'][cluster]['blades'] = {}
#output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
node_copy = output.copy()
if cluster not in node_copy[node]['clusters']:
node_copy[node]['clusters'].setdefault(cluster, {'blades': {}})
node_copy[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(node_copy)
参考资料
我正在尝试跨多个进程使用共享内存来更新包含字典的字典...我尝试使用多处理模块中的管理器,但我很难向它添加字典。请参阅下面的代码和评论。本质上,这段代码应该在另一个名为 "output." 的字典中创建输入的副本一旦我开始工作,就会有逻辑只从输入中复制某些 "blades",但是 node/cluster/blade 必须保持层次结构。
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
def iterate_over_clusters_in_node(input, node, lock, output):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
print "node: " + node + ", node_IP: " + input[node]['IP'] + ", cluster: " + cluster + ", Blade: " + blade + ", cluster_IP: " + input[node]['clusters'][cluster]['blades'][blade]
with lock:
add_blade_to_output(input, node, cluster, blade, output)
def add_blade_to_output(input, node, cluster, blade, output):
''' Adds a blade to the managed output dictionary'''
if node not in output:
output[node] = {}
output[node]['IP'] = input[node]['IP']
output[node]['clusters'] = {}
# At this point, I would expect output[node]['IP'] and output[node]['clusters'] to exist
# But the following print raises KeyError: 'IP'
print output[node]['IP']
if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
output[node]['clusters'][cluster] = {}
output[node]['clusters'][cluster]['blades'] = {}
output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock, output))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
# Expectation: should print the same dictionary as the input
# Actual: prints "{'Node_2': {}, 'Node_1': {}}"
我是否需要将 manager.dict() 添加到输出 [node] 而不是内置字典类型?还是我做错了?
谢谢!
编辑:我不反对将其切换为 "threading" 实现而不是 "multiprocessing." 我是 运行 并行事物的新手,所以如果线程更适合这个内存共享的类型,请告诉我。
编辑:这是工作代码:
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
def iterate_over_clusters_in_node(input, node, lock):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
add_blade_to_output(input, node, cluster, blade)
def add_blade_to_output(input, node, cluster, blade):
''' Adds a blade to the managed output dictionary'''
if node not in output:
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
new_node = {}
new_node.update(output)
if cluster not in output[node]['clusters']:
new_node[node]['clusters'][cluster] = {}
new_node[node]['clusters'][cluster]['blades'] = {blade: input[node]['clusters'][cluster]['blades'][blade]}
else:
new_node[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(new_node)
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
根据 python 文档,
Modifications to mutable values or items in dict and list proxies will not be propagated through the manager, because the proxy has no way of knowing when its values or items are modified. To modify such an item, you can re-assign the modified object to the container proxy.
根据这些信息,我们可以相应地更新管理器:
#output[node] = {}
#output[node]['IP'] = input[node]['IP']
#output[node]['clusters'] = {} These changes are not propagated through the manager
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
#if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
#output[node]['clusters'][cluster] = {}
#output[node]['clusters'][cluster]['blades'] = {}
#output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
node_copy = output.copy()
if cluster not in node_copy[node]['clusters']:
node_copy[node]['clusters'].setdefault(cluster, {'blades': {}})
node_copy[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(node_copy)