Python Pandas 多处理没有结果 return

Python Pandas multiprocessing no result return

我有一个df,你可以复制粘贴:

import pandas as pd
from io import StringIO

df = """
  ValOption  RB test 
0       SLA  4  3       
1       AC   5  4           
2       SLA  5  5          
3       AC   2  4       
4       SLA  5  5         
5       AC   3  4          
6       SLA  4  3         

"""
df = pd.read_csv(StringIO(df.strip()), sep='\s+')

输出:

ValOption   RB  test
0     SLA   4   3
1     AC    5   4
2     SLA   5   5
3     AC    2   4
4     SLA   5   5
5     AC    3   4
6     SLA   4   3

然后我有 2 个函数可以为此 df 构建新列:

def func1():
    df['r1']=df['test']+1
    return df['r1']

def func2():
    df['r2']=df['RB']+1
    return df['r2']

调用这两个函数后:

func1()
func2()

输出:

ValOption   RB  test    r1  r2
0    SLA    4   3      4    5
1     AC    5   4      5    6
2     SLA   5   5      6    6
3     AC    2   4      5    3
4     SLA   5   5      6    6
5     AC    3   4      5    4
6     SLA   4   3      4    5

但是当我尝试使用多处理时,我无法获得新列:

import multiprocessing
if __name__ ==  '__main__':

    p1 = multiprocessing.Process(target=func1)
    p2 = multiprocessing.Process(target=func2)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

输出:

ValOption   RB  test
0    SLA    4   3
1     AC    5   4
2    SLA    5   5
3     AC    2   4
4    SLA    5   5
5     AC    3   4
6    SLA    4   3

多处理没有return函数中的值。有朋友可以帮忙吗?

Pandas 数据帧绝对不是线程安全的。想一想如果当 func2 完成时 func1 执行到一半会发生什么! (而且 pandas 绝对不是原子的)。

幸运的是 multiprocessing 刚刚复制了变量并在副本上工作(实际上它已经 序列化 变量并将其发送到子进程)。所以如果你想在 multiprocessing 中工作,你采用这个工作流程:

  • 将任务分成几个步骤
  • 工作函数采取步骤并计算结果
  • 编译结果并将它们应用回对象

查看 multiprocessing.Pool 的一些教程,了解这是如何完成的。

我猜您正在使用笔记本,并且正在尝试包含 if __name__ == '__main__': 的单元格?

如果是这样,只是 运行 它外面的函数 - 就像那样:

import multiprocessing

p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)

p1.start()
p2.start()

p1.join()
p2.join()

或者保留它,但在这种情况下,将其作为 python 文件执行。

如果您使用 multiprocessing.Pool 并重写您的函数以使其更通用,您可以使用将输入映射到输出:

>>> def func(series):
...   return series + 1
... 
>>> with multiprocessing.Pool(2) as p:
...   dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
... 

然后在并行处理之外,用获得的结果修改数据帧,例如使用df.join():

>>> df.join(dat)
  ValOption  RB  test  r1  r2
0       SLA   4     3   4   5
1        AC   5     4   5   6
2       SLA   5     5   6   6
3        AC   2     4   5   3
4       SLA   5     5   6   6
5        AC   3     4   5   4
6       SLA   4     3   4   5

否则,获得结果的最佳方式是任务队列,请参阅 bottom of the multiprocessing page 中的示例。同样,您希望在任务中进行计算,但不修改任何共享数据结构。他们执行后你可以再次加入他们。

更复杂的解决方案可能不得不求助于 multiprocessing.Manager 子类,因为 pandas 系列和数据帧是不适合 multiprocessing.sharedctypes.* 选项的复杂对象。

好的,然后通过创建 class 来更改您的代码:

from multiprocessing import Process

class Test:
    def __init__(self, df):
        self.df = df
        
    def func1(self):
        df['r1'] = df['test']+1

    def func2(self):
        df['r2'] = df['RB']+1

p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())

p1.start()
p2.start()

p1.join()
p2.join()

这肯定有效