外部 vs 内部 __main__ 多处理中的变量定义

outside vs inside __main__ variable definition in multiprocessing

我有以下代码:

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    # semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

此代码按预期工作并打印:

READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
READ  <- PROCESS: 0 PID: 15803 PPID: 15798
READ  <- NOTHING IN BUFFER
WRITE -> PROCESS: 1 PID: 15806 PPID: 15798
READ  <- PROCESS: 1 PID: 15806 PPID: 15798
...

然后我有这个版本不工作(除了信号量的定义在另一个地方,它与第一个基本相同):

import multiprocessing
import time
import os

# WHEN SEMAPHORE IS DEFINED HERE THEN IT IT WORKS
# semaphore = multiprocessing.Semaphore(1)

def producer(num, output):
  print hex(id(semaphore))
  semaphore.acquire()
  time.sleep(1)
  element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
  print "WRITE -> " + element
  output.put(element)
  time.sleep(1)
  semaphore.release()

if __name__ == '__main__':
    """
    Reads elements as soon as they are are put inside queue
    """

    output    = multiprocessing.Manager().Queue()
    pool      = multiprocessing.Pool(4)
    lst       = range(40)

    # WHEN SEMAPHORE IS DEFINED HERE THEN IT DOES NOT WORKS
    semaphore = multiprocessing.Semaphore(1)

    for i in lst:
        pool.apply_async(producer, (i, output))
        # print "%d Do not wait!" % i
        # res.get()

    counter = 0
    while True:
      try:
        print "READ  <- " + output.get_nowait()
        counter += 1
        if (counter == len(lst)):
          print "Break"
          break
      except:
        print "READ  <- NOTHING IN BUFFER"  
        pass
      time.sleep(1)

此版本打印:

READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
READ  <- NOTHING IN BUFFER
...

似乎 if producer 从不向队列写入任何内容。我在某处读到 apply_sync 不打印错误消息。所以我在第二个代码中将 pool.apply_async(producer, (i, output)) 更改为 pool.apply(producer, (i, output)) 以查看发生了什么。似乎 semaphore 没有定义,这里是输出:

Traceback (most recent call last):
  File "glob_var_wrong.py", line 31, in <module>
    pool.apply(producer, (i, output))
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 567, in get
    raise self._value
NameError: global name 'semaphore' is not defined

但是以下代码运行正确并打印 10(__main__ 中定义的值):

global_var = 20

def print_global_var():
    print global_var

if __name__ == '__main__':
    global_var = 10
    print_global_var()

在这段代码中似乎可以在 __main__ 中定义全局变量,而在以前的代码中这是不可能的。首先,我假设 __main__ 中定义的变量不会在进程之间共享,但它只影响 semaphore 而不会影响 outputpoollst。为什么会这样?

当您使用 Multiprocessing.Process 创建新进程时(由 Pool 在幕后使用,它会复制本地作用域,对其进行 pickle,然后将其发送到新进程进行评估。

因为您在调用 Pool(4) 之前没有定义变量 semaphore,所以该变量未定义(在对代码进行求值的那些其他进程中)并且函数 producer 将抛出一个例外。

要查看此内容,请更改定义

def producer(num, output):
    print hex(id(semaphore))
    try:
        semaphore.acquire()
    except Exception as e:
        print e
    time.sleep(1)
    element = "PROCESS: %d PID: %d PPID: %d" % (num, os.getpid(), os.getppid())
    print "WRITE -> " + element
    output.put(element)
    time.sleep(1)
    semaphore.release()

现在您的失败代码将打印出一堆 (40) 个看起来像

的错误
global name 'semaphore' is not defined

这就是为什么必须在调用 Pool

之前定义信号量的原因

因为你在Windows上执行了代码。 您将在 Linux.

上获得预期结果

这就是 fork 和 spawn 的区别。