为 multprocessing 的每个 worker 初始化每个实例

Initialize each instance for each worker of multprocessing

作为共享(静态)数据和数据库连接的容器,我使用 Worker class:

import os
import multiprocessing as mp
import time

class Worker:
    def __init__(self, data):
        self.data = data
    def initializer(self):
        # setting up a database connection
        print("{} with PID {} initialized".format(self, os.getpid()))
    def __call__(self, value):
        print("{} with PID {} called with value={}".format(self, os.getpid(), value))
        # doing something with self.data and write it to the database
        time.sleep(0.5)

def main():
    print("main has PID {}".format(os.getpid()))
    worker = Worker([1,2,3,4])
    pool = mp.Pool(processes=2, initializer=worker.initializer)
    pool.map(worker, range(4))

if __name__=="__main__":
    main()

但是,初始化不起作用。我认为这是因为 initializer()__call__() 被不同的实例调用。上述代码输出结果如下:

main has PID 5
<__main__.Worker object at 0xA> in main
<__mp_main__.Worker object at 0xB> with PID 6 initialized
<__mp_main__.Worker object at 0xC> with PID 6 called with value=0
<__mp_main__.Worker object at 0xD> with PID 7 initialized
<__mp_main__.Worker object at 0xE> with PID 7 called with value=1
<__mp_main__.Worker object at 0xD> with PID 6 called with value=2
<__mp_main__.Worker object at 0xE> with PID 7 called with value=3

显然,Worker 实例的位置必须随着进程 ID (PID) 而改变,因为 fork 会产生一个新的但重复的实例。但是对于同一个 PID,initializer()__call__() 也会发生变化?因此,我的数据库连接不可用。

为什么会这样?如何在没有全局变量等的情况下正确

每个进程,您看到一个 Worker,它是您的 Worker class 的副本,另一个副本是您的 worker 对象的副本。

解决办法确实是使用全局的。进程之间不共享全局变量。

import os
import multiprocessing as mp
import time

class Worker:
    def __init__(self, data):
        self.data = data

    def initializer(self):
        # setting up a database connection
        global db
        print("{} with PID {} initialized".format(self, os.getpid()))
        db = "set by {} with PID {} initialized".format(self, os.getpid())

    def __call__(self, value):
        print("{} with PID {} called with value={}".format(self, os.getpid(), value))
        global db
        print("Using db {}".format(db))
        # doing something with self.data and write it to the database
        time.sleep(0.5)

def main():
    print("main has PID {}".format(os.getpid()))
    worker = Worker([1,2,3,4])
    pool = mp.Pool(processes=2, initializer=worker.initializer)
    pool.map(worker, range(4))

if __name__=="__main__":
    main()

生产

main has PID 29128
<__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=0
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=1
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=2
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=3
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized

作为解决此问题的另一种方法,请参阅:

import os
import multiprocessing as mp
import time

class Worker:
    def __init__(self, data):
        self.data = data

    def initializer(self):
        # setting up a database connection
        print("{} with PID {} initialized".format(self, os.getpid()))

    def __call__(self, value):
        print("{} with PID {} called with value={}".format(self, os.getpid(), value))
        # doing something with self.data and write it to the database
        time.sleep(0.5)

def worker_initializer(data):
    global worker
    worker = Worker(data)
    worker.initializer()

def worker_call(*args, **kwds):
    return worker(*args, **kwds)

def main():
    print("main has PID {}".format(os.getpid()))
    with mp.Pool(processes=2, initializer=worker_initializer, initargs=([1,2,3,4],)) as pool:
        pool.map(worker_call, range(4))

if __name__=="__main__":
    main()

我让你的 Worker class 保持不变,但引入了另一组辅助函数来适当地调用 worker。基本上,使用 Processes 意味着在除琐碎用途之外的所有用途中都使用全局状态(无论如何在子进程中)。

请注意,此输出有点令人困惑,因为两个子进程将在同一地址(在它们自己的地址 space 中)分配新的 Worker,因此会出现重复。