我可以在 python 中创建一个 "global" 完全连接的图,在 Pool.map() 处理中从并发工作人员同时更新吗?
Can I create a "global", fully connected graph in python, being concurrently updated from concurrent workers in a Pool.map() processing?
我想在 python 中并行构建一个完全连接的图,并获得边值列表,例如:
( node1, node2 ) = edge_value
以字典格式存储:
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }
为此,我首先要初始化两个 global
变量,G
用于图形,f_correlation
用于所述字典
import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}
然后创建一个函数来构造图形,并将
( node1, node2 ) = edge_value
存储到f_correlation
字典中:
def construct_graph_parallelly(pair_with_df):
global G
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[ (i, j) ] = calculate_value(df, i, j) # this function calculate some value on the dataframe
# here i, j are node in the graph
G.add_edge(i, j, weight = f_correlation[ (i, j) ])
return f_correlation
然后创建一个 multiprocessing.Pool()
-实例并调用它的 .map()
-方法是 运行,让代码并发执行:
def make_all_pair_with_df(node_list, df):
all_pair_with_df = []
for i in node_list:
for j in node_list:
if i != j :
pair_with_df = (i,j),df
all_pair_with_df.append(pair_with_df)
return all_pair_with_df
node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df)
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")
但是当我 运行 代码它 运行 无限时,从不打印“DONE”
其中一个问题可能是 global
-变量问题,在 Globals variables and Python multiprocessing
中讨论
但是为了我的工作,我需要全局更新字典和连通图。
我该怎么做或者我应该做哪些修改才能使它工作?
更新:让我们放宽心,使用多处理来构建 f_correlation
字典。
使用您当前的代码,每个进程都有自己的全局变量副本。您应该使用 sharable、managed 类型(参见 multiprocessing.SyncManager
)。例如:
from multiprocessing import Pool, Manager
# initialize this process's global variables:
def pool_initializer(the_dict):
# initialize global variable with shared, managed dictionary
global f_correlation
f_correlation = the_dict
def construct_graph_parallelly(pair_with_df):
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[(i, j)] = calculate_value(df, i, j) # this function calculate some value on the dataframe
def main():
with Manager() as manager: # create SyncManager instance
f_correlation = manager.dict() # create managed, shared dictionary
# code to initialize G omitted
with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
all_pair_with_df = make_all_pair_with_df(node_list, df)
pool.map(construct_graph_parallelly, all_pair_with_df)
# now build graph
G = nx.Graph()
for k, v in f_correlation.items():
i, j = k # unpack
G.add_edge(i, j, weight=v)
if __name__ == '__main__': # required for Windows
main()
我想在 python 中并行构建一个完全连接的图,并获得边值列表,例如:( node1, node2 ) = edge_value
以字典格式存储:
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }
为此,我首先要初始化两个 global
变量,G
用于图形,f_correlation
用于所述字典
import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}
然后创建一个函数来构造图形,并将( node1, node2 ) = edge_value
存储到f_correlation
字典中:
def construct_graph_parallelly(pair_with_df):
global G
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[ (i, j) ] = calculate_value(df, i, j) # this function calculate some value on the dataframe
# here i, j are node in the graph
G.add_edge(i, j, weight = f_correlation[ (i, j) ])
return f_correlation
然后创建一个 multiprocessing.Pool()
-实例并调用它的 .map()
-方法是 运行,让代码并发执行:
def make_all_pair_with_df(node_list, df):
all_pair_with_df = []
for i in node_list:
for j in node_list:
if i != j :
pair_with_df = (i,j),df
all_pair_with_df.append(pair_with_df)
return all_pair_with_df
node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df)
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")
但是当我 运行 代码它 运行 无限时,从不打印“DONE”
其中一个问题可能是 global
-变量问题,在 Globals variables and Python multiprocessing
但是为了我的工作,我需要全局更新字典和连通图。
我该怎么做或者我应该做哪些修改才能使它工作?
更新:让我们放宽心,使用多处理来构建 f_correlation
字典。
使用您当前的代码,每个进程都有自己的全局变量副本。您应该使用 sharable、managed 类型(参见 multiprocessing.SyncManager
)。例如:
from multiprocessing import Pool, Manager
# initialize this process's global variables:
def pool_initializer(the_dict):
# initialize global variable with shared, managed dictionary
global f_correlation
f_correlation = the_dict
def construct_graph_parallelly(pair_with_df):
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[(i, j)] = calculate_value(df, i, j) # this function calculate some value on the dataframe
def main():
with Manager() as manager: # create SyncManager instance
f_correlation = manager.dict() # create managed, shared dictionary
# code to initialize G omitted
with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
all_pair_with_df = make_all_pair_with_df(node_list, df)
pool.map(construct_graph_parallelly, all_pair_with_df)
# now build graph
G = nx.Graph()
for k, v in f_correlation.items():
i, j = k # unpack
G.add_edge(i, j, weight=v)
if __name__ == '__main__': # required for Windows
main()