外部 vs 内部 __main__ 多处理中的变量定义
outside vs inside __main__ variable definition in multiprocessing
我有以下代码:
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
# semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此代码按预期工作并打印:
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
READ <- PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ <- PROCESS: 1 PID: 15806 PPID: 15798
...
然后我有这个版本不工作(除了信号量的定义在另一个地方,它与第一个基本相同):
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
print hex(id(semaphore))
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此版本打印:
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
...
似乎 if producer
从不向队列写入任何内容。我在某处读到 apply_sync
不打印错误消息。所以我在第二个代码中将 pool.apply_async(producer, (i, output))
更改为 pool.apply(producer, (i, output))
以查看发生了什么。似乎 semaphore
没有定义,这里是输出:
Traceback (most recent call last):
File "glob_var_wrong.py", line 31, in <module>
pool.apply(producer, (i, output))
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: global name 'semaphore' is not defined
但是以下代码运行正确并打印 10(__main__
中定义的值):
global_var = 20
def print_global_var():
print global_var
if __name__ == '__main__':
global_var = 10
print_global_var()
在这段代码中似乎可以在 __main__
中定义全局变量,而在以前的代码中这是不可能的。首先,我假设 __main__
中定义的变量不会在进程之间共享,但它只影响 semaphore
而不会影响 output
、pool
、lst
。为什么会这样?
当您使用 Multiprocessing.Process
创建新进程时(由 Pool
在幕后使用,它会复制本地作用域,对其进行 pickle,然后将其发送到新进程进行评估。
因为您在调用 Pool(4)
之前没有定义变量 semaphore
,所以该变量未定义(在对代码进行求值的那些其他进程中)并且函数 producer
将抛出一个例外。
要查看此内容,请更改定义
def producer(num, output):
print hex(id(semaphore))
try:
semaphore.acquire()
except Exception as e:
print e
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
现在您的失败代码将打印出一堆 (40) 个看起来像
的错误
global name 'semaphore' is not defined
这就是为什么必须在调用 Pool
之前定义信号量的原因
因为你在Windows上执行了代码。
您将在 Linux.
上获得预期结果
这就是 fork 和 spawn 的区别。
我有以下代码:
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
# semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此代码按预期工作并打印:
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
READ <- PROCESS: 0 PID: 15803 PPID: 15798
READ <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ <- PROCESS: 1 PID: 15806 PPID: 15798
...
然后我有这个版本不工作(除了信号量的定义在另一个地方,它与第一个基本相同):
import multiprocessing
import time
import os
# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)
def producer(num, output):
print hex(id(semaphore))
semaphore.acquire()
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
if __name__ == '__main__':
"""
Reads elements as soon as they are are put inside queue
"""
output = multiprocessing.Manager().Queue()
pool = multiprocessing.Pool(4)
lst = range(40)
# WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
semaphore = multiprocessing.Semaphore(1)
for i in lst:
pool.apply_async(producer, (i, output))
# print "%d Do not wait!" % i
# res.get()
counter = 0
while True:
try:
print "READ <- " + output.get_nowait()
counter += 1
if (counter == len(lst)):
print "Break"
break
except:
print "READ <- NOTHING IN BUFFER"
pass
time.sleep(1)
此版本打印:
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
READ <- NOTHING IN BUFFER
...
似乎 if producer
从不向队列写入任何内容。我在某处读到 apply_sync
不打印错误消息。所以我在第二个代码中将 pool.apply_async(producer, (i, output))
更改为 pool.apply(producer, (i, output))
以查看发生了什么。似乎 semaphore
没有定义,这里是输出:
Traceback (most recent call last):
File "glob_var_wrong.py", line 31, in <module>
pool.apply(producer, (i, output))
File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
return self.apply_async(func, args, kwds).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: global name 'semaphore' is not defined
但是以下代码运行正确并打印 10(__main__
中定义的值):
global_var = 20
def print_global_var():
print global_var
if __name__ == '__main__':
global_var = 10
print_global_var()
在这段代码中似乎可以在 __main__
中定义全局变量,而在以前的代码中这是不可能的。首先,我假设 __main__
中定义的变量不会在进程之间共享,但它只影响 semaphore
而不会影响 output
、pool
、lst
。为什么会这样?
当您使用 Multiprocessing.Process
创建新进程时(由 Pool
在幕后使用,它会复制本地作用域,对其进行 pickle,然后将其发送到新进程进行评估。
因为您在调用 Pool(4)
之前没有定义变量 semaphore
,所以该变量未定义(在对代码进行求值的那些其他进程中)并且函数 producer
将抛出一个例外。
要查看此内容,请更改定义
def producer(num, output):
print hex(id(semaphore))
try:
semaphore.acquire()
except Exception as e:
print e
time.sleep(1)
element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
print "WRITE -> " + element
output.put(element)
time.sleep(1)
semaphore.release()
现在您的失败代码将打印出一堆 (40) 个看起来像
的错误global name 'semaphore' is not defined
这就是为什么必须在调用 Pool
因为你在Windows上执行了代码。 您将在 Linux.
上获得预期结果这就是 fork 和 spawn 的区别。