我可以在 python 中创建一个 "global" 完全连接的图,在 Pool.map() 处理中从并发工作人员同时更新吗?

Can I create a "global", fully connected graph in python, being concurrently updated from concurrent workers in a Pool.map() processing?

我想在 python 中并行构建一个完全连接的图,并获得边值列表,例如:
( node1, node2 ) = edge_value
以字典格式存储:
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }

为此,我首先要初始化两个 global 变量,G 用于图形,f_correlation 用于所述字典

import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}

然后创建一个函数来构造图形,并将
( node1, node2 ) = edge_value存储到f_correlation字典中:

def construct_graph_parallelly(pair_with_df):
    global G
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[ (i, j) ] = calculate_value(df, i, j)    # this function calculate some value on the dataframe
    # here i, j are node in the graph 
    G.add_edge(i, j, weight = f_correlation[ (i, j) ])
    return f_correlation

然后创建一个 multiprocessing.Pool()-实例并调用它的 .map()-方法是 运行,让代码并发执行:

def make_all_pair_with_df(node_list, df):
    all_pair_with_df = []
    for i in node_list:
        for j in node_list:
            if i != j :
                pair_with_df = (i,j),df
                all_pair_with_df.append(pair_with_df)

    return all_pair_with_df

node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df) 
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")

但是当我 运行 代码它 运行 无限时,从不打印“DONE”

其中一个问题可能是 global-变量问题,在 Globals variables and Python multiprocessing

中讨论

但是为了我的工作,我需要全局更新字典和连通图

我该怎么做或者我应该做哪些修改才能使它工作?

更新:让我们放宽心,使用多处理来构建 f_correlation 字典。

使用您当前的代码,每个进程都有自己的全局变量副本。您应该使用 sharablemanaged 类型(参见 multiprocessing.SyncManager)。例如:

from multiprocessing import Pool, Manager

# initialize this process's global variables:
def pool_initializer(the_dict):
    # initialize global variable with shared, managed dictionary
    global f_correlation
    f_correlation = the_dict

def construct_graph_parallelly(pair_with_df):
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[(i, j)] = calculate_value(df, i, j)    # this function calculate some value on the dataframe

def main():    
    with Manager() as manager: # create SyncManager instance
        f_correlation = manager.dict() # create managed, shared dictionary
        # code to initialize G omitted
        with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
            all_pair_with_df = make_all_pair_with_df(node_list, df) 
            pool.map(construct_graph_parallelly, all_pair_with_df)
            # now build graph
            G = nx.Graph()
            for k, v in f_correlation.items():
               i, j = k # unpack
               G.add_edge(i, j, weight=v)
    
if __name__ == '__main__': # required for Windows
    main()