为 multprocessing 的每个 worker 初始化每个实例
Initialize each instance for each worker of multprocessing
作为共享(静态)数据和数据库连接的容器,我使用 Worker
class:
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
print("{} with PID {} initialized".format(self, os.getpid()))
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
但是,初始化不起作用。我认为这是因为 initializer()
和 __call__()
被不同的实例调用。上述代码输出结果如下:
main has PID 5
<__main__.Worker object at 0xA> in main
<__mp_main__.Worker object at 0xB> with PID 6 initialized
<__mp_main__.Worker object at 0xC> with PID 6 called with value=0
<__mp_main__.Worker object at 0xD> with PID 7 initialized
<__mp_main__.Worker object at 0xE> with PID 7 called with value=1
<__mp_main__.Worker object at 0xD> with PID 6 called with value=2
<__mp_main__.Worker object at 0xE> with PID 7 called with value=3
显然,Worker
实例的位置必须随着进程 ID (PID) 而改变,因为 fork 会产生一个新的但重复的实例。但是对于同一个 PID,initializer()
和 __call__()
也会发生变化?因此,我的数据库连接不可用。
为什么会这样?如何在没有全局变量等的情况下正确?
每个进程,您看到一个 Worker,它是您的 Worker class 的副本,另一个副本是您的 worker 对象的副本。
解决办法确实是使用全局的。进程之间不共享全局变量。
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
global db
print("{} with PID {} initialized".format(self, os.getpid()))
db = "set by {} with PID {} initialized".format(self, os.getpid())
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
global db
print("Using db {}".format(db))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
生产
main has PID 29128
<__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=0
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=1
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=2
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=3
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
作为解决此问题的另一种方法,请参阅:
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
print("{} with PID {} initialized".format(self, os.getpid()))
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
# doing something with self.data and write it to the database
time.sleep(0.5)
def worker_initializer(data):
global worker
worker = Worker(data)
worker.initializer()
def worker_call(*args, **kwds):
return worker(*args, **kwds)
def main():
print("main has PID {}".format(os.getpid()))
with mp.Pool(processes=2, initializer=worker_initializer, initargs=([1,2,3,4],)) as pool:
pool.map(worker_call, range(4))
if __name__=="__main__":
main()
我让你的 Worker
class 保持不变,但引入了另一组辅助函数来适当地调用 worker。基本上,使用 Process
es 意味着在除琐碎用途之外的所有用途中都使用全局状态(无论如何在子进程中)。
请注意,此输出有点令人困惑,因为两个子进程将在同一地址(在它们自己的地址 space 中)分配新的 Worker
,因此会出现重复。
作为共享(静态)数据和数据库连接的容器,我使用 Worker
class:
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
print("{} with PID {} initialized".format(self, os.getpid()))
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
但是,初始化不起作用。我认为这是因为 initializer()
和 __call__()
被不同的实例调用。上述代码输出结果如下:
main has PID 5
<__main__.Worker object at 0xA> in main
<__mp_main__.Worker object at 0xB> with PID 6 initialized
<__mp_main__.Worker object at 0xC> with PID 6 called with value=0
<__mp_main__.Worker object at 0xD> with PID 7 initialized
<__mp_main__.Worker object at 0xE> with PID 7 called with value=1
<__mp_main__.Worker object at 0xD> with PID 6 called with value=2
<__mp_main__.Worker object at 0xE> with PID 7 called with value=3
显然,Worker
实例的位置必须随着进程 ID (PID) 而改变,因为 fork 会产生一个新的但重复的实例。但是对于同一个 PID,initializer()
和 __call__()
也会发生变化?因此,我的数据库连接不可用。
为什么会这样?如何在没有全局变量等的情况下正确?
每个进程,您看到一个 Worker,它是您的 Worker class 的副本,另一个副本是您的 worker 对象的副本。
解决办法确实是使用全局的。进程之间不共享全局变量。
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
global db
print("{} with PID {} initialized".format(self, os.getpid()))
db = "set by {} with PID {} initialized".format(self, os.getpid())
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
global db
print("Using db {}".format(db))
# doing something with self.data and write it to the database
time.sleep(0.5)
def main():
print("main has PID {}".format(os.getpid()))
worker = Worker([1,2,3,4])
pool = mp.Pool(processes=2, initializer=worker.initializer)
pool.map(worker, range(4))
if __name__=="__main__":
main()
生产
main has PID 29128
<__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=0
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=1
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
<__mp_main__.Worker object at 0x0000022319453AF0> with PID 30832 called with value=2
Using db set by <__mp_main__.Worker object at 0x0000022319997A00> with PID 30832 initialized
<__mp_main__.Worker object at 0x00000167E6B53AF0> with PID 29900 called with value=3
Using db set by <__mp_main__.Worker object at 0x00000167E7077A00> with PID 29900 initialized
作为解决此问题的另一种方法,请参阅:
import os
import multiprocessing as mp
import time
class Worker:
def __init__(self, data):
self.data = data
def initializer(self):
# setting up a database connection
print("{} with PID {} initialized".format(self, os.getpid()))
def __call__(self, value):
print("{} with PID {} called with value={}".format(self, os.getpid(), value))
# doing something with self.data and write it to the database
time.sleep(0.5)
def worker_initializer(data):
global worker
worker = Worker(data)
worker.initializer()
def worker_call(*args, **kwds):
return worker(*args, **kwds)
def main():
print("main has PID {}".format(os.getpid()))
with mp.Pool(processes=2, initializer=worker_initializer, initargs=([1,2,3,4],)) as pool:
pool.map(worker_call, range(4))
if __name__=="__main__":
main()
我让你的 Worker
class 保持不变,但引入了另一组辅助函数来适当地调用 worker。基本上,使用 Process
es 意味着在除琐碎用途之外的所有用途中都使用全局状态(无论如何在子进程中)。
请注意,此输出有点令人困惑,因为两个子进程将在同一地址(在它们自己的地址 space 中)分配新的 Worker
,因此会出现重复。