如何在 python 多处理 map_async 函数中共享字典?
How to share a dictionary in python multiprocessing map_async function?
我在 python 中有一个列表 lst
。我想在此列表的每一项上调用一个函数 f
。此函数 f
调用第三方函数 g
。我还想测量列表 lst
中的每个项目对 g
的每个函数调用所花费的时间。我想加快这个过程,所以我使用多处理池来并行执行。目前,我有以下代码,但它不起作用。我从thispost了解到map_async
只能调用一元函数。我还想利用在 map_async
中创建多个进程的优势,因此我不想切换到 apply_async
。有人可以建议我这里有什么更好的选择来实现我的目标吗?
我当前的解决方案不起作用:
import multiprocessing as mp
time_metrics = {}
def f(idx):
global time_metrics
a = time.now()
g(idx)
b = time.now()
time_metrics[idx] = b-a
lst = [1, 2, 3, 4, 5, 6]
pool = mp.Pool(7)
pool.map_async(f, lst)
pool.close()
pool.join()
print(time_metrics)
Multiprocessing 不共享内存 space,它使用进程 'forks' 克隆当前进程状态(或仅克隆其中的一部分,具体取决于 [=27= 的类型) ] 使用和操作系统)到 RAM 中的新位置,并分配给一个新的进程 ID,然后独立运行。如果你想使用共享内存区域,任务会变得更加复杂,而且我发现在我的一些旧项目中共享内存比使用队列将数据传回父进程并存储到字典中要慢。
对于此任务,虽然在我看来您不需要执行任何操作,但您可以只 return 时间值,然后在池完成执行后(在同步模式下,不是异步,以便进程池阻塞,直到所有进程完成任务)您可以迭代并收集结果。
所以这可能是最简单的解决方案:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
# placeholder function for whatever you have as g()
for i in range(5000*a):
pass
def f(idx):
# once spawned, a process calling this function cannot edit objects in the memory of the parent process,
# unless using the special shared memory objects in the mp class.
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# blocks until result is available
results = pool.map(f, lst)
for row in results:
time_metrics[row[0]] = row[1]
print(time_metrics)
如果您有兴趣,可以将其重构为使用多处理库中的共享内存字典或 mp.Queue 的实例将结果传回父进程进行收集,但这不是必需的据我所知这个问题。
您是否真的需要使用池的异步版本,或者这种方法是否足够?
如果您真的想使用 map_async,此代码段有效:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
for i in range(5000*a):
pass
def f(idx):
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
def append_res(result: tuple):
for row in result:
time_metrics[row[0]] = row[1]
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# doesn't block until result is available.
# callback is applied to list of results when all the tasks are complete
results = pool.map_async(f, lst, callback = append_res)
# wait for result to become available, otherwise parent process will exit the context manager and processes will not complete
results.wait()
print(time_metrics)
我不是 100% 确定 .map_async() 与 .map() 相比的行为,.map() 将按顺序将函数应用于可迭代对象并且不会启动新任务,直到分配给进程的前一个任务完成。这使得基准测试很有用,只要机器上的每个 CPU 核心不处理比核心更多的 python 进程,因为这只会增加开销和负载,并且会给你不准确的结果基准。对于 map_async,通常对于异步函数,单个结果可用的顺序可能不是它们的分配顺序,这对我来说意味着所有任务都同时分配给进程池可能会在任务之间产生对 CPU 资源的竞争,并且可能会产生不准确的基准,尽管我可以与某人一起在评论中对此进行确认。
我在 python 中有一个列表 lst
。我想在此列表的每一项上调用一个函数 f
。此函数 f
调用第三方函数 g
。我还想测量列表 lst
中的每个项目对 g
的每个函数调用所花费的时间。我想加快这个过程,所以我使用多处理池来并行执行。目前,我有以下代码,但它不起作用。我从thispost了解到map_async
只能调用一元函数。我还想利用在 map_async
中创建多个进程的优势,因此我不想切换到 apply_async
。有人可以建议我这里有什么更好的选择来实现我的目标吗?
我当前的解决方案不起作用:
import multiprocessing as mp
time_metrics = {}
def f(idx):
global time_metrics
a = time.now()
g(idx)
b = time.now()
time_metrics[idx] = b-a
lst = [1, 2, 3, 4, 5, 6]
pool = mp.Pool(7)
pool.map_async(f, lst)
pool.close()
pool.join()
print(time_metrics)
Multiprocessing 不共享内存 space,它使用进程 'forks' 克隆当前进程状态(或仅克隆其中的一部分,具体取决于 [=27= 的类型) ] 使用和操作系统)到 RAM 中的新位置,并分配给一个新的进程 ID,然后独立运行。如果你想使用共享内存区域,任务会变得更加复杂,而且我发现在我的一些旧项目中共享内存比使用队列将数据传回父进程并存储到字典中要慢。
对于此任务,虽然在我看来您不需要执行任何操作,但您可以只 return 时间值,然后在池完成执行后(在同步模式下,不是异步,以便进程池阻塞,直到所有进程完成任务)您可以迭代并收集结果。
所以这可能是最简单的解决方案:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
# placeholder function for whatever you have as g()
for i in range(5000*a):
pass
def f(idx):
# once spawned, a process calling this function cannot edit objects in the memory of the parent process,
# unless using the special shared memory objects in the mp class.
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# blocks until result is available
results = pool.map(f, lst)
for row in results:
time_metrics[row[0]] = row[1]
print(time_metrics)
如果您有兴趣,可以将其重构为使用多处理库中的共享内存字典或 mp.Queue 的实例将结果传回父进程进行收集,但这不是必需的据我所知这个问题。
您是否真的需要使用池的异步版本,或者这种方法是否足够?
如果您真的想使用 map_async,此代码段有效:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
for i in range(5000*a):
pass
def f(idx):
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
def append_res(result: tuple):
for row in result:
time_metrics[row[0]] = row[1]
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# doesn't block until result is available.
# callback is applied to list of results when all the tasks are complete
results = pool.map_async(f, lst, callback = append_res)
# wait for result to become available, otherwise parent process will exit the context manager and processes will not complete
results.wait()
print(time_metrics)
我不是 100% 确定 .map_async() 与 .map() 相比的行为,.map() 将按顺序将函数应用于可迭代对象并且不会启动新任务,直到分配给进程的前一个任务完成。这使得基准测试很有用,只要机器上的每个 CPU 核心不处理比核心更多的 python 进程,因为这只会增加开销和负载,并且会给你不准确的结果基准。对于 map_async,通常对于异步函数,单个结果可用的顺序可能不是它们的分配顺序,这对我来说意味着所有任务都同时分配给进程池可能会在任务之间产生对 CPU 资源的竞争,并且可能会产生不准确的基准,尽管我可以与某人一起在评论中对此进行确认。