如何在 Python 中使用 multiprocessing.pool 创建全局 lock/semaphore?
How to create global lock/semaphore with multiprocessing.pool in Python?
我想限制子进程中的资源访问。例如 - limit http downloads, disk io, etc.. 我怎样才能实现扩展这个基本代码?
请分享一些基本的代码示例。
pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
for job in job_queue.pull_jobs_for_processing:
pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()
创建池时使用 initializer 和 initargs 参数,以便在所有子进程中定义全局。
例如:
from multiprocessing import Pool, Lock
from time import sleep
def do_job(i):
"The greater i is, the shorter the function waits before returning."
with lock:
sleep(1-(i/10.))
return i
def init_child(lock_):
global lock
lock = lock_
def main():
lock = Lock()
poolsize = 4
with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
results = pool.imap_unordered(do_job, range(poolsize))
print(list(results))
if __name__ == "__main__":
main()
此代码将按升序(作业提交的顺序)打印出数字 0-3,因为它使用了锁。注释掉 with lock:
行以查看它按降序打印出数字。
此解决方案适用于 windows 和 unix。然而,因为进程可以在 unix 系统上进行 fork,所以 unix 只需要在模块范围内声明全局变量。子进程获得父进程内存的副本,其中包括仍然有效的锁对象。因此,初始化程序并不是严格需要的,但它可以帮助记录代码的预期工作方式。当 multiprocessing 能够通过分叉创建进程时,以下内容也有效。
from multiprocessing import Pool, Lock
from time import sleep
lock = Lock()
def do_job(i):
"The greater i is, the shorter the function waits before returning."
with lock:
sleep(1-(i/10.))
return i
def main():
poolsize = 4
with Pool(poolsize) as pool:
results = pool.imap_unordered(do_job, range(poolsize))
print(list(results))
if __name__ == "__main__":
main()
如果您正在访问资源,请使用全局信号量并获取它。例如:
import multiprocessing
from time import sleep
semaphore = multiprocessing.Semaphore(2)
def do_job(id):
with semaphore:
sleep(1)
print("Finished job")
def main():
pool = multiprocessing.Pool(6)
for job_id in range(6):
print("Starting job")
pool.apply_async(do_job, [job_id])
pool.close()
pool.join()
if __name__ == "__main__":
main()
这个程序每秒只完成两个作业,因为其他线程正在等待信号量。
我想限制子进程中的资源访问。例如 - limit http downloads, disk io, etc.. 我怎样才能实现扩展这个基本代码?
请分享一些基本的代码示例。
pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
for job in job_queue.pull_jobs_for_processing:
pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()
创建池时使用 initializer 和 initargs 参数,以便在所有子进程中定义全局。
例如:
from multiprocessing import Pool, Lock
from time import sleep
def do_job(i):
"The greater i is, the shorter the function waits before returning."
with lock:
sleep(1-(i/10.))
return i
def init_child(lock_):
global lock
lock = lock_
def main():
lock = Lock()
poolsize = 4
with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
results = pool.imap_unordered(do_job, range(poolsize))
print(list(results))
if __name__ == "__main__":
main()
此代码将按升序(作业提交的顺序)打印出数字 0-3,因为它使用了锁。注释掉 with lock:
行以查看它按降序打印出数字。
此解决方案适用于 windows 和 unix。然而,因为进程可以在 unix 系统上进行 fork,所以 unix 只需要在模块范围内声明全局变量。子进程获得父进程内存的副本,其中包括仍然有效的锁对象。因此,初始化程序并不是严格需要的,但它可以帮助记录代码的预期工作方式。当 multiprocessing 能够通过分叉创建进程时,以下内容也有效。
from multiprocessing import Pool, Lock
from time import sleep
lock = Lock()
def do_job(i):
"The greater i is, the shorter the function waits before returning."
with lock:
sleep(1-(i/10.))
return i
def main():
poolsize = 4
with Pool(poolsize) as pool:
results = pool.imap_unordered(do_job, range(poolsize))
print(list(results))
if __name__ == "__main__":
main()
如果您正在访问资源,请使用全局信号量并获取它。例如:
import multiprocessing
from time import sleep
semaphore = multiprocessing.Semaphore(2)
def do_job(id):
with semaphore:
sleep(1)
print("Finished job")
def main():
pool = multiprocessing.Pool(6)
for job_id in range(6):
print("Starting job")
pool.apply_async(do_job, [job_id])
pool.close()
pool.join()
if __name__ == "__main__":
main()
这个程序每秒只完成两个作业,因为其他线程正在等待信号量。