使用并发的线程池执行器:对不同数量的工人没有改善
Thread Pool Executor using Concurrent: no improvement for various number of workers
我正在尝试使用 Concurrent 并行执行任务。请在下面找到一段代码:
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures
# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)
# max_Worker = cpu_num
max_Worker = 1
# A fake input array
n=1000000
array = list(range(n))
results = []
# A fake function being applied to each element of array
def task(i):
return i**2
x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
features = {executor.submit(task, j) for j in array}
# the real function is heavy and we need to be sure of completeness of each run
for future in concurrent.futures.as_completed(features):
results.append(future.result())
results = [future.result() for future in features]
y = time.time()
print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')
现在我的问题是,
- 虽然没有报错,但是是correct/optimized吗?
- 在玩工人数的时候,好像没有
速度的提高(例如,1 对 16,没有区别)。然后,
有什么问题,如何解决?
提前致谢,
请参阅我对您的问题的评论。对于我在该评论中提到的开销,您还需要在创建进程池本身时添加 oberhead。
以下是具有多个结果的基准测试。第一个是仅调用 worker 函数 task
100000 次并创建 results
列表并打印出该列表的最后一个元素的计时。为什么我将调用 task
的次数从 1000000 次减少到 100000 次就很明显了。
下一次尝试是使用 multiprocessing 通过 submit
方法使用 ProcessPoolExecutor 完成同样的事情,然后处理返回的 Future
个实例。
下一次尝试是使用 map
方法,并使用默认的 chunksize 参数 1。理解这个论点很重要。 chunksize 值为 1 时,传递给 map
方法的 iterable 的每个元素都单独写入队列任务作为一个块,由池中的进程处理。当池进程空闲寻找工作时,它从队列中拉出下一个要执行的任务块,处理组成该块的每个任务,然后再次空闲。当通过 map
提交大量提交任务时,chunksize 值为 1 效率低下。您会期望它的性能相当于对 iterable.
的每个元素重复发出 submit
调用
下一次尝试指定一个 chunksize 值,该值或多或少近似于 multiprocessing
包中 Pool
class 使用的 map
函数的值默认使用。正如您所看到的,改进是显着的,但仍然不是非多处理情况下的改进。
最后一次尝试使用包 multiprocessing
及其 multiprocessing.pool.Pool
class 提供的多处理功能。此基准测试的不同之处在于,当未指定 chunksize 参数时,其 map
函数使用更智能的默认值 chunksize。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
return i**2
# required for Windows:
if __name__ == '__main__':
n=100000
t1 = time.time()
results = [task(i) for i in range(n)]
print('Non-multiprocessing time:', time.time() - t1, results[-1])
# num CPUs
cpu_num = os.cpu_count()
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
print('Multiprocessing time using submit:', time.time() - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
print('Multiprocessing time using map:', time.time() - t1, results[-1])
t1 = time.time()
chunksize = n // (4 * cpu_num)
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])
打印:
Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available: 8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001
更新
以下基准测试使用 task
的版本,该版本非常 CPU 密集并显示了多处理的好处。对于这个小的 iterable 大小 (100),它似乎也会强制 chunksize 值为 1 Pool.map
情况(它会默认情况下计算 chunksize 值为 4),性能稍好。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
for _ in range(1_000_000):
result = i ** 2
return result
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
# required for Windows:
if __name__ == '__main__':
n = 100
cpu_num = os.cpu_count()
chunksize = compute_chunksize(n, cpu_num)
t1 = time.time()
results = [task(i) for i in range(n)]
t2 = time.time()
print('Non-multiprocessing time:', t2 - t1, results[-1])
# num CPUs
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
t2 = time.time()
print('Multiprocessing time using submit:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
t2 = time.time()
print('Multiprocessing time using map:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
t2 = time.time()
print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
t2 = time.time()
print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n), chunksize=1)
t2 = time.time()
print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])
打印:
Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available: 8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801
我正在尝试使用 Concurrent 并行执行任务。请在下面找到一段代码:
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures
# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)
# max_Worker = cpu_num
max_Worker = 1
# A fake input array
n=1000000
array = list(range(n))
results = []
# A fake function being applied to each element of array
def task(i):
return i**2
x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
features = {executor.submit(task, j) for j in array}
# the real function is heavy and we need to be sure of completeness of each run
for future in concurrent.futures.as_completed(features):
results.append(future.result())
results = [future.result() for future in features]
y = time.time()
print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')
现在我的问题是,
- 虽然没有报错,但是是correct/optimized吗?
- 在玩工人数的时候,好像没有 速度的提高(例如,1 对 16,没有区别)。然后, 有什么问题,如何解决?
提前致谢,
请参阅我对您的问题的评论。对于我在该评论中提到的开销,您还需要在创建进程池本身时添加 oberhead。
以下是具有多个结果的基准测试。第一个是仅调用 worker 函数 task
100000 次并创建 results
列表并打印出该列表的最后一个元素的计时。为什么我将调用 task
的次数从 1000000 次减少到 100000 次就很明显了。
下一次尝试是使用 multiprocessing 通过 submit
方法使用 ProcessPoolExecutor 完成同样的事情,然后处理返回的 Future
个实例。
下一次尝试是使用 map
方法,并使用默认的 chunksize 参数 1。理解这个论点很重要。 chunksize 值为 1 时,传递给 map
方法的 iterable 的每个元素都单独写入队列任务作为一个块,由池中的进程处理。当池进程空闲寻找工作时,它从队列中拉出下一个要执行的任务块,处理组成该块的每个任务,然后再次空闲。当通过 map
提交大量提交任务时,chunksize 值为 1 效率低下。您会期望它的性能相当于对 iterable.
submit
调用
下一次尝试指定一个 chunksize 值,该值或多或少近似于 multiprocessing
包中 Pool
class 使用的 map
函数的值默认使用。正如您所看到的,改进是显着的,但仍然不是非多处理情况下的改进。
最后一次尝试使用包 multiprocessing
及其 multiprocessing.pool.Pool
class 提供的多处理功能。此基准测试的不同之处在于,当未指定 chunksize 参数时,其 map
函数使用更智能的默认值 chunksize。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
return i**2
# required for Windows:
if __name__ == '__main__':
n=100000
t1 = time.time()
results = [task(i) for i in range(n)]
print('Non-multiprocessing time:', time.time() - t1, results[-1])
# num CPUs
cpu_num = os.cpu_count()
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
print('Multiprocessing time using submit:', time.time() - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
print('Multiprocessing time using map:', time.time() - t1, results[-1])
t1 = time.time()
chunksize = n // (4 * cpu_num)
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])
打印:
Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available: 8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001
更新
以下基准测试使用 task
的版本,该版本非常 CPU 密集并显示了多处理的好处。对于这个小的 iterable 大小 (100),它似乎也会强制 chunksize 值为 1 Pool.map
情况(它会默认情况下计算 chunksize 值为 4),性能稍好。
import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool
# A fake function being applied to each element of array
def task(i):
for _ in range(1_000_000):
result = i ** 2
return result
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, pool_size * 4)
if remainder:
chunksize += 1
return chunksize
# required for Windows:
if __name__ == '__main__':
n = 100
cpu_num = os.cpu_count()
chunksize = compute_chunksize(n, cpu_num)
t1 = time.time()
results = [task(i) for i in range(n)]
t2 = time.time()
print('Non-multiprocessing time:', t2 - t1, results[-1])
# num CPUs
print("Number of CPUs available: ",cpu_num)
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
futures = [executor.submit(task, i) for i in range(n)]
results = [future.result() for future in futures]
t2 = time.time()
print('Multiprocessing time using submit:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n)))
t2 = time.time()
print('Multiprocessing time using map:', t2 - t1, results[-1])
t1 = time.time()
with PE(max_workers=cpu_num) as executor:
results = list(executor.map(task, range(n), chunksize=chunksize))
t2 = time.time()
print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n))
t2 = time.time()
print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])
t1 = time.time()
with Pool(cpu_num) as executor:
results = executor.map(task, range(n), chunksize=1)
t2 = time.time()
print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])
打印:
Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available: 8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801