全局变量如何在 Python 的并行编程中工作?
How global variable works in parallel programming with Python?
我有这个代码。在顺序方法中打印消息“no ok”,而在并行方法中打印消息 ["ok", "ok", "ok"] 而不是 ["not ok", "not ok", "不正常"],这是我的预期。
如何在不将变量 globVar 作为“测试”函数中的参数的情况下更改它?
import multiprocessing
global globVar
globVar = 'ok'
def test(arg1):
print(arg1)
return globVar
if __name__ == "__main__" :
globVar = 'not ok'
#Sequential
print(test(0))
#Parallel
pool = multiprocessing.Pool()
argList = [0,1,2]
result = pool.map(test,argList)
pool.close()
TL;DR. 您可以跳到最后一段寻找解决方案或阅读所有内容以了解实际情况。
您没有使用您的平台(例如 windows
或 linux
)标记您的问题作为发布带有 multiprocessing
标记的问题的指南;全局变量的行为(Anglos 的“行为”)在很大程度上取决于平台。
在使用方法 spawn
创建新进程的平台上,例如 Windows,要在使用 pool = multiprocessing.Pool()
语句创建的池中创建和初始化每个进程,一个创建新的空地址 space 并启动新的 Python 解释器,重新读取并重新执行源程序,以便在最终调用辅助函数之前初始化地址 space test
。这意味着全局范围内的每个语句,即导入语句、变量声明、函数声明等,都是为此目的而执行的。但是,在新的子流程变量 __name__
中, 不会 为“__main__”,因此不会执行 if __name__ == "__main__" :
块中的任何语句。这就是为什么对于 Windows 平台,您必须将创建新进程的代码放在这样的块中。如果不这样做,将导致无限递归进程创建循环,否则无法检测到。
因此,如果您在 Windows 下处于 运行,您的主进程在创建池之前已将 globVar
设置为 'not ok'。但是,当进程在调用 test
之前初始化时,您的源将被重新执行,每个进程 在其自己的地址 space 中运行,因此具有自己的 globVar
将该变量重新初始化为 'ok'. That 是 test
将看到的值,前面的语句暗示修改globVar
的本地副本不会反映回主进程。
现在在使用 fork
创建新进程的平台上,例如 Linux
,情况有点不同。创建子进程时,每个子进程都以只读方式继承父进程的地址 space,并且仅当它尝试修改内存时才会获得副本(“写时复制”)。这显然是一种更有效的流程创建机制。因此,在这种情况下,test
将看到 globVar
的值为 'not ok',因为这是创建子流程时的值。但是如果 test
更新 globVar
,“写时复制”机制将确保它正在更新本地地址 space 中存在的 globVar
。因此,主进程将再次看不到更新后的值。
因此,让工作函数 返回 值作为您的 test
函数正在做的是反映回主进程结果的标准方法。 您的问题是 您没有以预期的 globVar
值开始。 这可以通过初始化池的进程来解决使用 initializer 和 initargs 参数到 Pool
构造函数的正确 globVar
值(参见 documentation ):
import multiprocessing
global globVar
globVar = 'ok'
def init_processes(gVar):
global globVar
globVar = gVar
def test(arg1):
print(arg1)
return globVar
if __name__ == "__main__" :
globVar = 'not ok'
#Sequential
print(test(0))
#Parallel
pool = multiprocessing.Pool(initializer=init_processes, initargs=(globVar,))
argList = [0,1,2]
result = pool.map(test,argList)
pool.close()
print(result)
打印:
0
not ok
0
1
2
['not ok', 'not ok', 'not ok']
在 windows 机器上,我尝试了一个多处理,通过它,一个全局列表变量被一些函数操作。在这个例子中,系统无法操作全局变量。实际上,我们试图通过一些函数并以并行的方式将一些新元素添加到列表中;但名单最终没有填满:
from multiprocessing import Pool, freeze_support
import multiprocessing
global mylist
mylist = []
def f1(a):
global mylist
mylist.append(a)
return mylist
def f2(a):
global mylist
mylist.append(a)
return mylist
def f3(a):
global mylist
mylist.append(a)
return mylist
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
freeze_support()
r1 = pool.apply_async(f1,[1])
r2 = pool.apply_async(f2,[2])
r3 = pool.apply_async(f3,[3])
r4 = pool.apply_async(f1,[4])
r5 = pool.apply_async(f2,[5])
r6 = pool.apply_async(f3,[6])
a1 = r1.get(timeout=234)
a2 = r2.get(timeout=234)
a3 = r3.get(timeout=234)
a4 = r4.get(timeout=234)
a5 = r5.get(timeout=234)
a6 = r6.get(timeout=234)
print(mylist)
输出为:
[]
我分析一下,当你创建多进程时,内存被完全分成separated/independent块或段。并且进程之间没有共享内存。这是具有 Python 多处理功能的事实。主进程声明一个全局变量,然后其他三个子进程将为其自己的范围定义另外三个全局变量。然而,程序员看到了一个全局变量。
但是,不用担心,下面是 above-mentioned 示例的解决方案:
from multiprocessing import Pool, freeze_support
import multiprocessing
from multiprocessing import Process,Manager
def f1(a):
l = []
l.append(a)
return l
def f2(a):
l = []
l.append(a)
return l
def f3(a):
l = []
l.append(a)
return l
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
freeze_support()
r1 = pool.apply_async(f1,[1])
r2 = pool.apply_async(f2,[2])
r3 = pool.apply_async(f3,[3])
r4 = pool.apply_async(f1,[4])
r5 = pool.apply_async(f2,[5])
r6 = pool.apply_async(f3,[6])
a1 = r1.get(timeout=234)
a2 = r2.get(timeout=234)
a3 = r3.get(timeout=234)
a4 = r4.get(timeout=234)
a5 = r5.get(timeout=234)
a6 = r6.get(timeout=234)
mylist = a1 + a2 + a3 + a4 + a5 + a6
print(mylist)
输出为:
[1, 2, 3, 4, 5, 6]
解决方法是你需要将每个并行处理函数的输出打包。
实际上,这是我发现使用 Python 的唯一策略,当我尝试其他方法(例如多处理管理器)时,它无法正常工作。
我有这个代码。在顺序方法中打印消息“no ok”,而在并行方法中打印消息 ["ok", "ok", "ok"] 而不是 ["not ok", "not ok", "不正常"],这是我的预期。
如何在不将变量 globVar 作为“测试”函数中的参数的情况下更改它?
import multiprocessing
global globVar
globVar = 'ok'
def test(arg1):
print(arg1)
return globVar
if __name__ == "__main__" :
globVar = 'not ok'
#Sequential
print(test(0))
#Parallel
pool = multiprocessing.Pool()
argList = [0,1,2]
result = pool.map(test,argList)
pool.close()
TL;DR. 您可以跳到最后一段寻找解决方案或阅读所有内容以了解实际情况。
您没有使用您的平台(例如 windows
或 linux
)标记您的问题作为发布带有 multiprocessing
标记的问题的指南;全局变量的行为(Anglos 的“行为”)在很大程度上取决于平台。
在使用方法 spawn
创建新进程的平台上,例如 Windows,要在使用 pool = multiprocessing.Pool()
语句创建的池中创建和初始化每个进程,一个创建新的空地址 space 并启动新的 Python 解释器,重新读取并重新执行源程序,以便在最终调用辅助函数之前初始化地址 space test
。这意味着全局范围内的每个语句,即导入语句、变量声明、函数声明等,都是为此目的而执行的。但是,在新的子流程变量 __name__
中, 不会 为“__main__”,因此不会执行 if __name__ == "__main__" :
块中的任何语句。这就是为什么对于 Windows 平台,您必须将创建新进程的代码放在这样的块中。如果不这样做,将导致无限递归进程创建循环,否则无法检测到。
因此,如果您在 Windows 下处于 运行,您的主进程在创建池之前已将 globVar
设置为 'not ok'。但是,当进程在调用 test
之前初始化时,您的源将被重新执行,每个进程 在其自己的地址 space 中运行,因此具有自己的 globVar
将该变量重新初始化为 'ok'. That 是 test
将看到的值,前面的语句暗示修改globVar
的本地副本不会反映回主进程。
现在在使用 fork
创建新进程的平台上,例如 Linux
,情况有点不同。创建子进程时,每个子进程都以只读方式继承父进程的地址 space,并且仅当它尝试修改内存时才会获得副本(“写时复制”)。这显然是一种更有效的流程创建机制。因此,在这种情况下,test
将看到 globVar
的值为 'not ok',因为这是创建子流程时的值。但是如果 test
更新 globVar
,“写时复制”机制将确保它正在更新本地地址 space 中存在的 globVar
。因此,主进程将再次看不到更新后的值。
因此,让工作函数 返回 值作为您的 test
函数正在做的是反映回主进程结果的标准方法。 您的问题是 您没有以预期的 globVar
值开始。 这可以通过初始化池的进程来解决使用 initializer 和 initargs 参数到 Pool
构造函数的正确 globVar
值(参见 documentation ):
import multiprocessing
global globVar
globVar = 'ok'
def init_processes(gVar):
global globVar
globVar = gVar
def test(arg1):
print(arg1)
return globVar
if __name__ == "__main__" :
globVar = 'not ok'
#Sequential
print(test(0))
#Parallel
pool = multiprocessing.Pool(initializer=init_processes, initargs=(globVar,))
argList = [0,1,2]
result = pool.map(test,argList)
pool.close()
print(result)
打印:
0
not ok
0
1
2
['not ok', 'not ok', 'not ok']
在 windows 机器上,我尝试了一个多处理,通过它,一个全局列表变量被一些函数操作。在这个例子中,系统无法操作全局变量。实际上,我们试图通过一些函数并以并行的方式将一些新元素添加到列表中;但名单最终没有填满:
from multiprocessing import Pool, freeze_support
import multiprocessing
global mylist
mylist = []
def f1(a):
global mylist
mylist.append(a)
return mylist
def f2(a):
global mylist
mylist.append(a)
return mylist
def f3(a):
global mylist
mylist.append(a)
return mylist
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
freeze_support()
r1 = pool.apply_async(f1,[1])
r2 = pool.apply_async(f2,[2])
r3 = pool.apply_async(f3,[3])
r4 = pool.apply_async(f1,[4])
r5 = pool.apply_async(f2,[5])
r6 = pool.apply_async(f3,[6])
a1 = r1.get(timeout=234)
a2 = r2.get(timeout=234)
a3 = r3.get(timeout=234)
a4 = r4.get(timeout=234)
a5 = r5.get(timeout=234)
a6 = r6.get(timeout=234)
print(mylist)
输出为:
[]
我分析一下,当你创建多进程时,内存被完全分成separated/independent块或段。并且进程之间没有共享内存。这是具有 Python 多处理功能的事实。主进程声明一个全局变量,然后其他三个子进程将为其自己的范围定义另外三个全局变量。然而,程序员看到了一个全局变量。
但是,不用担心,下面是 above-mentioned 示例的解决方案:
from multiprocessing import Pool, freeze_support
import multiprocessing
from multiprocessing import Process,Manager
def f1(a):
l = []
l.append(a)
return l
def f2(a):
l = []
l.append(a)
return l
def f3(a):
l = []
l.append(a)
return l
if __name__ == '__main__':
pool = Pool(multiprocessing.cpu_count())
freeze_support()
r1 = pool.apply_async(f1,[1])
r2 = pool.apply_async(f2,[2])
r3 = pool.apply_async(f3,[3])
r4 = pool.apply_async(f1,[4])
r5 = pool.apply_async(f2,[5])
r6 = pool.apply_async(f3,[6])
a1 = r1.get(timeout=234)
a2 = r2.get(timeout=234)
a3 = r3.get(timeout=234)
a4 = r4.get(timeout=234)
a5 = r5.get(timeout=234)
a6 = r6.get(timeout=234)
mylist = a1 + a2 + a3 + a4 + a5 + a6
print(mylist)
输出为:
[1, 2, 3, 4, 5, 6]
解决方法是你需要将每个并行处理函数的输出打包。
实际上,这是我发现使用 Python 的唯一策略,当我尝试其他方法(例如多处理管理器)时,它无法正常工作。