使用并发的线程池执行器:对不同数量的工人没有改善

Thread Pool Executor using Concurrent: no improvement for various number of workers

我正在尝试使用 Concurrent 并行执行任务。请在下面找到一段代码:

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
import concurrent.futures

# num CPUs
cpu_num = len(os.sched_getaffinity(0))
print("Number of cpu available : ",cpu_num)

# max_Worker = cpu_num
max_Worker = 1

# A fake input array
n=1000000
array = list(range(n))
results = []

# A fake function being applied to each element of array 
def task(i):
  return i**2 

x = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_Worker) as executor:
  features = {executor.submit(task, j) for j in array}

  # the real function is heavy and we need to be sure of completeness of each run
  for future in concurrent.futures.as_completed(features):
    results.append(future.result())
      
results = [future.result() for future in features]
y = time.time()

print('=========================================')
print(f"Train data preparation time (s): {(y-x)}")
print('=========================================')

现在我的问题是,

  1. 虽然没有报错,但是是correct/optimized吗?
  2. 在玩工人数的时候,好像没有 速度的提高(例如,1 对 16,没有区别)。然后, 有什么问题,如何解决?

提前致谢,

请参阅我对您的问题的评论。对于我在该评论中提到的开销,您还需要在创建进程池本身时添加 oberhead。

以下是具有多个结果的基准测试。第一个是仅调用 worker 函数 task 100000 次并创建 results 列表并打印出该列表的最后一个元素的计时。为什么我将调用 task 的次数从 1000000 次减少到 100000 次就很明显了。

下一次尝试是使用 multiprocessing 通过 submit 方法使用 ProcessPoolExecutor 完成同样的事情,然后处理返回的 Future 个实例。

下一次尝试是使用 map 方法,并使用默认的 chunksize 参数 1。理解这个论点很重要。 chunksize 值为 1 时,传递给 map 方法的 iterable 的每个元素都单独写入队列任务作为一个块,由池中的进程处理。当池进程空闲寻找工作时,它从队列中拉出下一个要执行的任务块,处理组成该块的每个任务,然后再次空闲。当通过 map 提交大量提交任务时,chunksize 值为 1 效率低下。您会期望它的性能相当于对 iterable.

的每个元素重复发出 submit 调用

下一次尝试指定一个 chunksize 值,该值或多或少近似于 multiprocessing 包中 Pool class 使用的 map 函数的值默认使用。正如您所看到的,改进是显着的,但仍然不是非多处理情况下的改进。

最后一次尝试使用包 multiprocessing 及其 multiprocessing.pool.Pool class 提供的多处理功能。此基准测试的不同之处在于,当未指定 chunksize 参数时,其 map 函数使用更智能的默认值 chunksize

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
  return i**2

# required for Windows:
if __name__ == '__main__':
    n=100000

    t1 = time.time()
    results = [task(i) for i in range(n)]
    print('Non-multiprocessing time:', time.time() - t1, results[-1])

    # num CPUs
    cpu_num = os.cpu_count()
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
    print('Multiprocessing time using submit:', time.time() - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
    print('Multiprocessing time using map:', time.time() - t1, results[-1])

    t1 = time.time()
    chunksize = n // (4 * cpu_num)
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
    print(f'Multiprocessing time using map: {time.time() - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
    print('Multiprocessing time using Pool.map:', time.time() - t1, results[-1])

打印:

Non-multiprocessing time: 0.027019739151000977 9999800001
Number of CPUs available:  8
Multiprocessing time using submit: 77.34723353385925 9999800001
Multiprocessing time using map: 79.52981925010681 9999800001
Multiprocessing time using map: 0.30500149726867676, chunksize: 3125 9999800001
Multiprocessing time using Pool.map: 0.2799997329711914 9999800001

更新

以下基准测试使用 task 的版本,该版本非常 CPU 密集并显示了多处理的好处。对于这个小的 iterable 大小 (100),它似乎也会强制 chunksize 值为 1 Pool.map 情况(它会默认情况下计算 chunksize 值为 4),性能稍好。

import os
import time
from concurrent.futures import ProcessPoolExecutor as PE
from multiprocessing import Pool

# A fake function being applied to each element of array
def task(i):
    for _ in range(1_000_000):
        result = i ** 2
    return result

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, pool_size * 4)
    if remainder:
        chunksize += 1
    return chunksize

# required for Windows:
if __name__ == '__main__':
    n = 100
    cpu_num = os.cpu_count()
    chunksize = compute_chunksize(n, cpu_num)

    t1 = time.time()
    results = [task(i) for i in range(n)]
    t2 = time.time()
    print('Non-multiprocessing time:', t2 - t1, results[-1])

    # num CPUs
    print("Number of CPUs available: ",cpu_num)

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        futures = [executor.submit(task, i) for i in range(n)]
        results = [future.result() for future in futures]
        t2 = time.time()
    print('Multiprocessing time using submit:', t2 - t1,  results[-1])

    t1 = time.time()
    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n)))
        t2 = time.time()
    print('Multiprocessing time using map:', t2 - t1, results[-1])

    t1 = time.time()

    with PE(max_workers=cpu_num) as executor:
        results = list(executor.map(task, range(n), chunksize=chunksize))
        t2 = time.time()
    print(f'Multiprocessing time using map: {t2 - t1}, chunksize: {chunksize}', results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n))
        t2 = time.time()
    print('Multiprocessing time using Pool.map:', t2 - t1, results[-1])

    t1 = time.time()
    with Pool(cpu_num) as executor:
        results = executor.map(task, range(n), chunksize=1)
        t2 = time.time()
    print('Multiprocessing time using Pool.map (chunksize=1):', t2 - t1, results[-1])

打印:

Non-multiprocessing time: 23.12758779525757 9801
Number of CPUs available:  8
Multiprocessing time using submit: 5.336004018783569 9801
Multiprocessing time using map: 5.364996671676636 9801
Multiprocessing time using map: 5.444890975952148, chunksize: 4 9801
Multiprocessing time using Pool.map: 5.400001287460327 9801
Multiprocessing time using Pool.map (chunksize=1): 4.698001146316528 9801