如何通过多个参数正确地开始并行执行两个函数?
how to properly start parallel executing of two functions over multiple arguments?
我正在寻找一种方法来并行启动两个函数,每个函数都在一组给定的不同参数上执行。我使用 pool.map
来实现这一点。我创建了两个不同的进程,每个进程启动一个执行 map
的池。这行得通 - 执行顺序有点乱,但我会把它留到另一个问题。
现在我也找到了另一种方法here(见第一个答案)。他们只使用一个池并连续调用 map_async
两次。所以我想知道,是否有更好的方法来做到这一点?
因为我读过(遗憾的是我不记得在哪里)最好只使用一个池,这意味着第二种方法(只使用一个池)更好。但是当我测量时间时,第一种方法(在不同的进程中使用两个池)实际上要快一点。此外,在第一种方法中,函数实际上是 运行 并行的,而在第二种方法中,首先执行 map_async
的第一个调用,然后执行第二个调用。
这是我的测试代码:
from multiprocessing import Process, Pool
import time
import os
multiple_pools = True
data = list(range(1, 11))
def func_a(param):
print(f'running func_a in process {os.getpid()}')
print(f'passed argument: {param}')
print('calculating...\n')
time.sleep(1.5)
print('done\n')
def func_b(param):
print(f'running func_b in process {os.getpid()}')
print(f'passed argument: {param}')
print('calculating...\n')
time.sleep(2.5)
print('done\n')
def execute_func(func, param):
p = Pool(processes=8)
with p:
p.map(func, param)
if __name__ == '__main__':
if not multiple_pools:
t0 = time.time()
p = Pool(processes=8)
res = p.map_async(func_a, data)
res = p.map_async(func_b, data)
p.close()
p.join()
t1 = time.time()
dt = t1 -t0
print(f'time spent with one pool: {dt} s')
else:
t0 = time.time()
p1 = Process(target=execute_func, args=(func_a, data))
p2 = Process(target=execute_func, args=(func_b, data))
p1.start()
p2.start()
p1.join()
p2.join()
p1.close()
p2.close()
t1=time.time()
dt = t1 -t0
print(f'time spent with two pools, each inside an own process: {dt} s')
那么,我的问题又来了:有没有一种方法优于另一种方法?或者甚至 other/better 方法可以做到这一点?
首先,我假设当您使用两个池时,您将使用非阻塞 map_async
方法。我会说两个大小为 N 的池,每个池中您向所有任务都相同的每个池提交 M 个任务(即,就 CPU、I/O 等而言需要相同的资源) ) 应该是 或多或少 在时间上相当于将相同的 2 * M 任务提交到大小为 2 * N 的单个池。
下面的程序演示了这两种情况:
from multiprocessing import Pool
import time
QUARTER_SECOND_ITERATIONS = 5_000_000
def quarter_second(x):
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return x * x
def callback(result):
global callback_count
print('Two pools result:', result)
callback_count += 1
if callback_count == 2:
# Both map-async calls have completed:
print('Two pools time:', time.time() - start_time)
# required for Windows:
if __name__ == '__main__':
data1 = range(10)
data2 = range(10, 20)
# Two pools:
pool1 = Pool(4)
pool2 = Pool(4)
callback_count = 0
start_time = time.time()
pool1.map_async(quarter_second, data1, callback=callback)
pool2.map_async(quarter_second, data2, callback=callback)
pool1.close()
pool1.join()
pool2.close()
pool2.join()
# One Pool:
data = range(20)
pool = Pool(8)
start_time = time.time()
result = pool.map(quarter_second, data)
print('One pool result:', result)
print('One pool time:', time.time() - start_time)
pool.close()
pool.join()
打印:
Two pools result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Two pools result: [100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Two pools time: 1.4994373321533203
One pool result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
One pool time: 1.4596436023712158
我重新运行了几次,大多数,但并非所有时候单池情况都稍好一些。但是我的桌面上还有许多其他进程 运行 会影响结果。我没有在总时间中包括创建处理池的实际时间。此外,根据池的大小和 iterable 参数,map 函数可以计算出略有不同的 chunksize 值,以便在没有显式时使用chunksize 参数被指定为这里的情况。但这对性能的影响可以忽略不计。 总而言之,根据我的假设,我真的看不出单池和双池方法之间有任何显着的性能差异。
我正在寻找一种方法来并行启动两个函数,每个函数都在一组给定的不同参数上执行。我使用 pool.map
来实现这一点。我创建了两个不同的进程,每个进程启动一个执行 map
的池。这行得通 - 执行顺序有点乱,但我会把它留到另一个问题。
现在我也找到了另一种方法here(见第一个答案)。他们只使用一个池并连续调用 map_async
两次。所以我想知道,是否有更好的方法来做到这一点?
因为我读过(遗憾的是我不记得在哪里)最好只使用一个池,这意味着第二种方法(只使用一个池)更好。但是当我测量时间时,第一种方法(在不同的进程中使用两个池)实际上要快一点。此外,在第一种方法中,函数实际上是 运行 并行的,而在第二种方法中,首先执行 map_async
的第一个调用,然后执行第二个调用。
这是我的测试代码:
from multiprocessing import Process, Pool
import time
import os
multiple_pools = True
data = list(range(1, 11))
def func_a(param):
print(f'running func_a in process {os.getpid()}')
print(f'passed argument: {param}')
print('calculating...\n')
time.sleep(1.5)
print('done\n')
def func_b(param):
print(f'running func_b in process {os.getpid()}')
print(f'passed argument: {param}')
print('calculating...\n')
time.sleep(2.5)
print('done\n')
def execute_func(func, param):
p = Pool(processes=8)
with p:
p.map(func, param)
if __name__ == '__main__':
if not multiple_pools:
t0 = time.time()
p = Pool(processes=8)
res = p.map_async(func_a, data)
res = p.map_async(func_b, data)
p.close()
p.join()
t1 = time.time()
dt = t1 -t0
print(f'time spent with one pool: {dt} s')
else:
t0 = time.time()
p1 = Process(target=execute_func, args=(func_a, data))
p2 = Process(target=execute_func, args=(func_b, data))
p1.start()
p2.start()
p1.join()
p2.join()
p1.close()
p2.close()
t1=time.time()
dt = t1 -t0
print(f'time spent with two pools, each inside an own process: {dt} s')
那么,我的问题又来了:有没有一种方法优于另一种方法?或者甚至 other/better 方法可以做到这一点?
首先,我假设当您使用两个池时,您将使用非阻塞 map_async
方法。我会说两个大小为 N 的池,每个池中您向所有任务都相同的每个池提交 M 个任务(即,就 CPU、I/O 等而言需要相同的资源) ) 应该是 或多或少 在时间上相当于将相同的 2 * M 任务提交到大小为 2 * N 的单个池。
下面的程序演示了这两种情况:
from multiprocessing import Pool
import time
QUARTER_SECOND_ITERATIONS = 5_000_000
def quarter_second(x):
sum = 0
for _ in range(QUARTER_SECOND_ITERATIONS):
sum += 1
return x * x
def callback(result):
global callback_count
print('Two pools result:', result)
callback_count += 1
if callback_count == 2:
# Both map-async calls have completed:
print('Two pools time:', time.time() - start_time)
# required for Windows:
if __name__ == '__main__':
data1 = range(10)
data2 = range(10, 20)
# Two pools:
pool1 = Pool(4)
pool2 = Pool(4)
callback_count = 0
start_time = time.time()
pool1.map_async(quarter_second, data1, callback=callback)
pool2.map_async(quarter_second, data2, callback=callback)
pool1.close()
pool1.join()
pool2.close()
pool2.join()
# One Pool:
data = range(20)
pool = Pool(8)
start_time = time.time()
result = pool.map(quarter_second, data)
print('One pool result:', result)
print('One pool time:', time.time() - start_time)
pool.close()
pool.join()
打印:
Two pools result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Two pools result: [100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Two pools time: 1.4994373321533203
One pool result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
One pool time: 1.4596436023712158
我重新运行了几次,大多数,但并非所有时候单池情况都稍好一些。但是我的桌面上还有许多其他进程 运行 会影响结果。我没有在总时间中包括创建处理池的实际时间。此外,根据池的大小和 iterable 参数,map 函数可以计算出略有不同的 chunksize 值,以便在没有显式时使用chunksize 参数被指定为这里的情况。但这对性能的影响可以忽略不计。 总而言之,根据我的假设,我真的看不出单池和双池方法之间有任何显着的性能差异。