multiprocessing.pool 使用管理器和异步方法
multiprocessing.pool with manager and async methods
我正在尝试使用 Manager() 在进程之间共享字典并尝试了以下代码:
from multiprocessing import Manager, Pool
def f(d):
d['x'] += 2
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
d['x'] = 2
p= Pool(4)
for _ in range(2000):
p.map_async(f, (d,)) #apply_async, map
p.close()
p.join()
print (d) # expects this result --> {'x': 4002}
使用map_async和apply_async,打印的结果总是不同的(例如{'x': 3838},{'x': 3770})。
但是,使用 map 会给出预期的结果。
另外,我试过用Process代替Pool,结果也不一样。
有什么见解吗?
非阻塞部分和竞争条件的问题不是由管理器处理的吗?
当您调用 map
(而不是 map_async
)时,它将阻塞,直到处理器完成您传递的所有请求,在您的情况下,这只是对函数的一次调用 f
。因此,即使您的池大小为 4,您实际上也是一次执行 2000 个进程。要实际并行执行,您应该执行单个 p.map(f, [d]*2000)
而不是循环。
但是当你调用 map_async
时,你不会阻塞并且 returned 一个结果对象。在结果对象 上对 get
的调用将 阻塞,直到进程完成,并将 return 与函数调用的结果一起使用。所以现在你 运行 一次最多有 4 个进程。但是字典的更新并没有在处理器之间序列化。我修改了代码以使用多处理锁强制序列化 d[x] += 2
。您会看到结果现在是 4002。
from multiprocessing import Manager, Pool, Lock
def f(d):
lock.acquire()
d['x'] += 2
lock.release()
def init(l):
global lock
lock = l
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
d['x'] = 2
lock = Lock()
p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes
results = [] # if the function returnd a result we wanted
for _ in range(2000):
results.append(p.map_async(f, (d,))) #apply_async, map
"""
for i in range(2000): # if the function returned a result we wanted
results[i].get() # wait for everything to finish
"""
p.close()
p.join()
print(d)
我正在尝试使用 Manager() 在进程之间共享字典并尝试了以下代码:
from multiprocessing import Manager, Pool
def f(d):
d['x'] += 2
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
d['x'] = 2
p= Pool(4)
for _ in range(2000):
p.map_async(f, (d,)) #apply_async, map
p.close()
p.join()
print (d) # expects this result --> {'x': 4002}
使用map_async和apply_async,打印的结果总是不同的(例如{'x': 3838},{'x': 3770})。 但是,使用 map 会给出预期的结果。 另外,我试过用Process代替Pool,结果也不一样。
有什么见解吗? 非阻塞部分和竞争条件的问题不是由管理器处理的吗?
当您调用 map
(而不是 map_async
)时,它将阻塞,直到处理器完成您传递的所有请求,在您的情况下,这只是对函数的一次调用 f
。因此,即使您的池大小为 4,您实际上也是一次执行 2000 个进程。要实际并行执行,您应该执行单个 p.map(f, [d]*2000)
而不是循环。
但是当你调用 map_async
时,你不会阻塞并且 returned 一个结果对象。在结果对象 上对 get
的调用将 阻塞,直到进程完成,并将 return 与函数调用的结果一起使用。所以现在你 运行 一次最多有 4 个进程。但是字典的更新并没有在处理器之间序列化。我修改了代码以使用多处理锁强制序列化 d[x] += 2
。您会看到结果现在是 4002。
from multiprocessing import Manager, Pool, Lock
def f(d):
lock.acquire()
d['x'] += 2
lock.release()
def init(l):
global lock
lock = l
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
d['x'] = 2
lock = Lock()
p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes
results = [] # if the function returnd a result we wanted
for _ in range(2000):
results.append(p.map_async(f, (d,))) #apply_async, map
"""
for i in range(2000): # if the function returned a result we wanted
results[i].get() # wait for everything to finish
"""
p.close()
p.join()
print(d)