如何在 python 中的多处理目标函数中对全局数据帧执行?

How to perform on global dataframe in the target function of multiprocessing in python?

我有以下代码。我想使用 calculate_mi 函数在全局数据帧 df 和 python 多进程上计算所有对的值。

from multiprocess import Pool

def calculate_mi(pair):
  global df
  from pyitlib import discrete_random_variable as drv
  import numpy as np
  i, j = pair
  val = ( 2*drv.information_mutual(df[i].values.astype(np.int32), df[j].values.astype(np.int32)) ) / ( drv.entropy(df[i].values.astype(np.int32)) + drv.entropy(df[j].values.astype(np.int32)) )
  return (i,j), val

def calculate_value(t_df):
  global df
  df = t_df
  all_pair = [('1', '2'), ('1', '3'), ('2', '1'), ('2', '3'), ('3', '1'), ('3', '2')]

  pool = Pool()
  pair_value_list = pool.map(calculate_mi, all_pair)
  pool.close()
  print(pair_value_list)

def calc():
  data = {'1':[1, 0, 1, 1],
    '2':[0, 1, 1, 0],
    '3':[1, 1, 0, 1],
    '0':[0, 1, 0, 1] }

  t_df = pd.DataFrame(data)
  calculate_value(t_df)

if __name__ == '__main__':
  calc()

此代码为我提供了 google colab 平台中的预期输出。但是当我在我的本地机器上 运行 它时,它给出了以下错误。 (我正在使用 windows 10,anaconda,jupyter notebook,python 3.6.9)。我该如何解决这个问题或者还有其他方法吗? RemoteTraceback Traceback (most recent call last), ... NameError: name 'df' is not defined

首先,有几件事:

  1. 应该是:from multiprocessing import Pool(不是from multiprocess
  2. 您似乎遗漏了 pandas 库的导入。

继续...

问题是在 Windows 下,新进程的创建不是使用 fork 调用完成的,因此子进程不会自动继承全局变量,例如 df .因此,您必须在创建 Pool:

时使用初始化程序初始化每个子流程,使全局变量 df 正确初始化
from multiprocessing import Pool
import pandas as pd

def calculate_mi(pair):
  global df
  from pyitlib import discrete_random_variable as drv
  import numpy as np
  i, j = pair
  val = ( 2*drv.information_mutual(df[i].values.astype(np.int32), df[j].values.astype(np.int32)) ) / ( drv.entropy(df[i].values.astype(np.int32)) + drv.entropy(df[j].values.astype(np.int32)) )
  return (i,j), val

# initialize global variable df for each sub-process
def initpool(t_df):
    global df
    df = t_df

def calculate_value(t_df):
  all_pair = [('1', '2'), ('1', '3'), ('2', '1'), ('2', '3'), ('3', '1'), ('3', '2')]

  # make sure each sub-process has global variable df properly initialized:    
  pool = Pool(initializer=initpool, initargs=(t_df,))
  pair_value_list = pool.map(calculate_mi, all_pair)
  pool.close()
  print(pair_value_list)

def calc():
  data = {'1':[1, 0, 1, 1],
    '2':[0, 1, 1, 0],
    '3':[1, 1, 0, 1],
    '0':[0, 1, 0, 1] }

  t_df = pd.DataFrame(data)
  calculate_value(t_df)

if __name__ == '__main__':
  calc()