能否在 python 中有效地缩短 CPU 任务?

Can one efficiently thread short CPU tasks in python?

我正在尝试简化一个涉及一组可以并行完成的短任务的程序,在进入下一步之前必须比较这组任务的结果(这又涉及一组短任务,然后是另一组,等等)。由于这些任务的复杂程度,由于设置时间,不值得使用 multiprocessing。我想知道是否有另一种方法可以比线性更快地并行执行这些短任务。我不认为 I can find on this site that describes this problem for Python references this answer on memory sharing 回答了我的问题(或者即使回答了我也无法理解)。

为了说明我希望做什么,请考虑对从 0N 的一堆数字求和的问题。 (当然这可以通过分析来解决,我的观点是想出一个低内存但短期 CPU 密集型任务。)首先,线性方法就是:

def numbers(a,b):
    return(i for i in range(a,b))

def linear_sum(a):
    return(sum(numbers(a[0],a[1])))

n = 2000
linear_sum([0, n+1])
#2001000

对于线程,我想将问题分解成多个部分,然后可以分别求和然后组合,所以我的想法是得到一堆范围,在这些范围内求和

def get_ranges(i, Nprocess = 3):
    di = i // Nprocess
    j = np.append(np.arange(0, i, di), [i+1,])
    return([(j[k], j[k+1]) for k in range(len(j)-1)])

对于某些值n >> NProcesses,伪代码示例类似于

values = get_ranges(n)
x = []
for value in values:
   x.append(do_someting_parallel(value))
return(sum(x))

那么问题来了,如何实现do_someting_parallel?对于 multiprocessing,我们可以这样做:

from multiprocessing import Pool as ThreadPool

def mpc_thread_sum(i, Nprocess = 3):
    values = get_ranges(i)
    pool = ThreadPool(Nprocess)
    results = pool.map(linear_sum, values)
    pool.close()
    pool.join()
    return(sum(results))

print(mpc_thread_sum(2000))
# 2001000

下图显示了所描述的不同方法的性能。有没有一种方法可以加快 multiprocessing 仍然比线性慢的区域的计算速度,或者这是 Python 的 GIL 中的并行化限制?我怀疑答案可能是我达到了极限,但想在这里问一下以确定。我尝试了 multiprocessing.dummyasynciothreadingThreadPoolExecutor(来自 concurrent.futures)。为简洁起见,我省略了代码,但所有代码都显示了与线性方法相当的执行时间。所有这些都是为 I/O 任务设计的,因此受 GIL 限制。

我的第一个观察是函数 numbers 的 运行ning 时间可以通过简单地定义为:

大致减少一半
def numbers(a, b):
    return range(a, b)

其次,如果不借助 C 语言 [=50],使用纯 Python 计算数字总和的 100% CPU 密集型任务永远不会表现得更好 [=50] =]time 库(例如 numpy),因为争夺全局解释器锁 (GIL),这会阻止任何类型的并行化发生(并且 asyncio 只使用一个线程来处理)。

第三,针对 100% CPU 任务 运行 宁纯 Python 代码的唯一方法是使用多处理。但是在创建进程池时有 CPU 开销,在将参数从主进程传递到地址 space 时有 CPU 开销,进程池的进程在其中 运行ning在传回结果时再次进入和开销。因此,为了提高性能,辅助函数 linear_sum 不能微不足道;它必须需要足够的 CPU 处理才能保证我刚才提到的额外开销。

以下基准测试 运行 是辅助函数,已重命名为 compute_sum,现在接受 range 作为其参数。为了进一步减少开销,我引入了一个函数 split,它将采用传递的 range 参数并生成多个 range 实例,从而无需使用 numpy 和生成数组。基准测试使用单线程(线性)、多线程池和多处理池计算总和,并且 n = 2000n = 50_000_000 是 运行 两次。基准显示所有进程的运行时间和总 CPU 时间。

对于 n = 2000,正如预期的那样,多处理的性能比线性和多线程都要差。对于 n = 50_000_000,由于上述额外开销,多处理的总 CPU 时间比线性和多线程的时间略长。但是现在经过的时间已经大大减少了。对于 n 的两个值,多线程是失败者。

from multiprocessing.pool import Pool, ThreadPool

import time

def split(iterable, n):
    k, m = divmod(len(iterable), n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

def compute_sum(r):
    t = time.process_time()
    return (sum(r), time.process_time() - t)

if __name__ == '__main__':
    for n in (2000, 50_000_000):
        r = range(0, n+1)

        t1 = time.time()
        s, cpu = compute_sum(r)
        elapsed = time.time() - t1
        print(f'n = {n}, linear elapsed time = {elapsed}, total     cpu time = {cpu}, sum = {s}')

        t1 = time.time()
        t2 = time.process_time()
        thread_pool = ThreadPool(4)
        s = 0
        for return_value, process_time in thread_pool.imap_unordered(compute_sum, split(r, 4)):
            s += return_value
        elapsed = time.time() - t1
        cpu = time.process_time() - t2
        print(f'n = {n}, thread pool elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
        thread_pool.close()
        thread_pool.join()

        t1 = time.time()
        t2 = time.process_time()
        pool = Pool(4)
        s = 0
        cpu = 0
        for return_value, process_time in pool.imap_unordered(compute_sum, split(r, 4)):
            s += return_value
            cpu += process_time
        elapsed = time.time() - t1
        cpu += time.process_time() - t2
        print(f'n = {n}, multiprocessing elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
        pool.close()
        pool.join()
        print()

打印:

n = 2000, linear elapsed time = 0.0, total cpu time = 0.0, sum = 2001000
n = 2000, thread pool elapsed time = 0.00700068473815918, total cpu time = 0.015625, sum = 2001000
n = 2000, multiprocessing elapsed time = 0.13200139999389648, total cpu time = 0.015625, sum = 2001000

n = 50000000, linear elapsed time = 2.0311124324798584, total cpu time = 2.03125, sum = 1250000025000000
n = 50000000, thread pool elapsed time = 2.050999164581299, total cpu time = 2.046875, sum = 1250000025000000
n = 50000000, multiprocessing elapsed time = 0.7579991817474365, total cpu time = 2.359375, sum = 125000002500000