Python Pandas 多处理没有结果 return
Python Pandas multiprocessing no result return
我有一个df,你可以复制粘贴:
import pandas as pd
from io import StringIO
df = """
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
"""
df = pd.read_csv(StringIO(df.strip()), sep='\s+')
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
然后我有 2 个函数可以为此 df 构建新列:
def func1():
df['r1']=df['test']+1
return df['r1']
def func2():
df['r2']=df['RB']+1
return df['r2']
调用这两个函数后:
func1()
func2()
输出:
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
但是当我尝试使用多处理时,我无法获得新列:
import multiprocessing
if __name__ == '__main__':
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
多处理没有return函数中的值。有朋友可以帮忙吗?
Pandas 数据帧绝对不是线程安全的。想一想如果当 func2 完成时 func1 执行到一半会发生什么! (而且 pandas 绝对不是原子的)。
幸运的是 multiprocessing 刚刚复制了变量并在副本上工作(实际上它已经 序列化 变量并将其发送到子进程)。所以如果你想在 multiprocessing 中工作,你采用这个工作流程:
- 将任务分成几个步骤
- 工作函数采取步骤并计算结果
- 编译结果并将它们应用回对象
查看 multiprocessing.Pool
的一些教程,了解这是如何完成的。
我猜您正在使用笔记本,并且正在尝试包含 if __name__ == '__main__':
的单元格?
如果是这样,只是 运行 它外面的函数 - 就像那样:
import multiprocessing
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
或者保留它,但在这种情况下,将其作为 python 文件执行。
如果您使用 multiprocessing.Pool
并重写您的函数以使其更通用,您可以使用将输入映射到输出:
>>> def func(series):
... return series + 1
...
>>> with multiprocessing.Pool(2) as p:
... dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
...
然后在并行处理之外,用获得的结果修改数据帧,例如使用df.join()
:
>>> df.join(dat)
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
否则,获得结果的最佳方式是任务队列,请参阅 bottom of the multiprocessing page 中的示例。同样,您希望在任务中进行计算,但不修改任何共享数据结构。他们执行后你可以再次加入他们。
更复杂的解决方案可能不得不求助于 multiprocessing.Manager
子类,因为 pandas 系列和数据帧是不适合 multiprocessing.sharedctypes.*
选项的复杂对象。
好的,然后通过创建 class 来更改您的代码:
from multiprocessing import Process
class Test:
def __init__(self, df):
self.df = df
def func1(self):
df['r1'] = df['test']+1
def func2(self):
df['r2'] = df['RB']+1
p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())
p1.start()
p2.start()
p1.join()
p2.join()
这肯定有效
我有一个df,你可以复制粘贴:
import pandas as pd
from io import StringIO
df = """
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
"""
df = pd.read_csv(StringIO(df.strip()), sep='\s+')
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
然后我有 2 个函数可以为此 df 构建新列:
def func1():
df['r1']=df['test']+1
return df['r1']
def func2():
df['r2']=df['RB']+1
return df['r2']
调用这两个函数后:
func1()
func2()
输出:
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
但是当我尝试使用多处理时,我无法获得新列:
import multiprocessing
if __name__ == '__main__':
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
输出:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
多处理没有return函数中的值。有朋友可以帮忙吗?
Pandas 数据帧绝对不是线程安全的。想一想如果当 func2 完成时 func1 执行到一半会发生什么! (而且 pandas 绝对不是原子的)。
幸运的是 multiprocessing 刚刚复制了变量并在副本上工作(实际上它已经 序列化 变量并将其发送到子进程)。所以如果你想在 multiprocessing 中工作,你采用这个工作流程:
- 将任务分成几个步骤
- 工作函数采取步骤并计算结果
- 编译结果并将它们应用回对象
查看 multiprocessing.Pool
的一些教程,了解这是如何完成的。
我猜您正在使用笔记本,并且正在尝试包含 if __name__ == '__main__':
的单元格?
如果是这样,只是 运行 它外面的函数 - 就像那样:
import multiprocessing
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
或者保留它,但在这种情况下,将其作为 python 文件执行。
如果您使用 multiprocessing.Pool
并重写您的函数以使其更通用,您可以使用将输入映射到输出:
>>> def func(series):
... return series + 1
...
>>> with multiprocessing.Pool(2) as p:
... dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
...
然后在并行处理之外,用获得的结果修改数据帧,例如使用df.join()
:
>>> df.join(dat)
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
否则,获得结果的最佳方式是任务队列,请参阅 bottom of the multiprocessing page 中的示例。同样,您希望在任务中进行计算,但不修改任何共享数据结构。他们执行后你可以再次加入他们。
更复杂的解决方案可能不得不求助于 multiprocessing.Manager
子类,因为 pandas 系列和数据帧是不适合 multiprocessing.sharedctypes.*
选项的复杂对象。
好的,然后通过创建 class 来更改您的代码:
from multiprocessing import Process
class Test:
def __init__(self, df):
self.df = df
def func1(self):
df['r1'] = df['test']+1
def func2(self):
df['r2'] = df['RB']+1
p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())
p1.start()
p2.start()
p1.join()
p2.join()
这肯定有效