能否在 python 中有效地缩短 CPU 任务?
Can one efficiently thread short CPU tasks in python?
我正在尝试简化一个涉及一组可以并行完成的短任务的程序,在进入下一步之前必须比较这组任务的结果(这又涉及一组短任务,然后是另一组,等等)。由于这些任务的复杂程度,由于设置时间,不值得使用 multiprocessing
。我想知道是否有另一种方法可以比线性更快地并行执行这些短任务。我不认为 I can find on this site that describes this problem for Python references this answer on memory sharing 回答了我的问题(或者即使回答了我也无法理解)。
为了说明我希望做什么,请考虑对从 0
到 N
的一堆数字求和的问题。 (当然这可以通过分析来解决,我的观点是想出一个低内存但短期 CPU 密集型任务。)首先,线性方法就是:
def numbers(a,b):
return(i for i in range(a,b))
def linear_sum(a):
return(sum(numbers(a[0],a[1])))
n = 2000
linear_sum([0, n+1])
#2001000
对于线程,我想将问题分解成多个部分,然后可以分别求和然后组合,所以我的想法是得到一堆范围,在这些范围内求和
def get_ranges(i, Nprocess = 3):
di = i // Nprocess
j = np.append(np.arange(0, i, di), [i+1,])
return([(j[k], j[k+1]) for k in range(len(j)-1)])
对于某些值n >> NProcesses
,伪代码示例类似于
values = get_ranges(n)
x = []
for value in values:
x.append(do_someting_parallel(value))
return(sum(x))
那么问题来了,如何实现do_someting_parallel
?对于 multiprocessing
,我们可以这样做:
from multiprocessing import Pool as ThreadPool
def mpc_thread_sum(i, Nprocess = 3):
values = get_ranges(i)
pool = ThreadPool(Nprocess)
results = pool.map(linear_sum, values)
pool.close()
pool.join()
return(sum(results))
print(mpc_thread_sum(2000))
# 2001000
下图显示了所描述的不同方法的性能。有没有一种方法可以加快 multiprocessing
仍然比线性慢的区域的计算速度,或者这是 Python 的 GIL 中的并行化限制?我怀疑答案可能是我达到了极限,但想在这里问一下以确定。我尝试了 multiprocessing.dummy
、asyncio
、threading
和 ThreadPoolExecutor
(来自 concurrent.futures
)。为简洁起见,我省略了代码,但所有代码都显示了与线性方法相当的执行时间。所有这些都是为 I/O 任务设计的,因此受 GIL 限制。
我的第一个观察是函数 numbers
的 运行ning 时间可以通过简单地定义为:
大致减少一半
def numbers(a, b):
return range(a, b)
其次,如果不借助 C 语言 [=50],使用纯 Python 计算数字总和的 100% CPU 密集型任务永远不会表现得更好 [=50] =]time 库(例如 numpy
),因为争夺全局解释器锁 (GIL),这会阻止任何类型的并行化发生(并且 asyncio
只使用一个线程来处理)。
第三,针对 100% CPU 任务 运行 宁纯 Python 代码的唯一方法是使用多处理。但是在创建进程池时有 CPU 开销,在将参数从主进程传递到地址 space 时有 CPU 开销,进程池的进程在其中 运行ning在传回结果时再次进入和开销。因此,为了提高性能,辅助函数 linear_sum
不能微不足道;它必须需要足够的 CPU 处理才能保证我刚才提到的额外开销。
以下基准测试 运行 是辅助函数,已重命名为 compute_sum
,现在接受 range
作为其参数。为了进一步减少开销,我引入了一个函数 split
,它将采用传递的 range
参数并生成多个 range
实例,从而无需使用 numpy
和生成数组。基准测试使用单线程(线性)、多线程池和多处理池计算总和,并且 n = 2000
和 n = 50_000_000
是 运行 两次。基准显示所有进程的运行时间和总 CPU 时间。
对于 n = 2000
,正如预期的那样,多处理的性能比线性和多线程都要差。对于 n = 50_000_000
,由于上述额外开销,多处理的总 CPU 时间比线性和多线程的时间略长。但是现在经过的时间已经大大减少了。对于 n
的两个值,多线程是失败者。
from multiprocessing.pool import Pool, ThreadPool
import time
def split(iterable, n):
k, m = divmod(len(iterable), n)
return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def compute_sum(r):
t = time.process_time()
return (sum(r), time.process_time() - t)
if __name__ == '__main__':
for n in (2000, 50_000_000):
r = range(0, n+1)
t1 = time.time()
s, cpu = compute_sum(r)
elapsed = time.time() - t1
print(f'n = {n}, linear elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
t1 = time.time()
t2 = time.process_time()
thread_pool = ThreadPool(4)
s = 0
for return_value, process_time in thread_pool.imap_unordered(compute_sum, split(r, 4)):
s += return_value
elapsed = time.time() - t1
cpu = time.process_time() - t2
print(f'n = {n}, thread pool elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
thread_pool.close()
thread_pool.join()
t1 = time.time()
t2 = time.process_time()
pool = Pool(4)
s = 0
cpu = 0
for return_value, process_time in pool.imap_unordered(compute_sum, split(r, 4)):
s += return_value
cpu += process_time
elapsed = time.time() - t1
cpu += time.process_time() - t2
print(f'n = {n}, multiprocessing elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
pool.close()
pool.join()
print()
打印:
n = 2000, linear elapsed time = 0.0, total cpu time = 0.0, sum = 2001000
n = 2000, thread pool elapsed time = 0.00700068473815918, total cpu time = 0.015625, sum = 2001000
n = 2000, multiprocessing elapsed time = 0.13200139999389648, total cpu time = 0.015625, sum = 2001000
n = 50000000, linear elapsed time = 2.0311124324798584, total cpu time = 2.03125, sum = 1250000025000000
n = 50000000, thread pool elapsed time = 2.050999164581299, total cpu time = 2.046875, sum = 1250000025000000
n = 50000000, multiprocessing elapsed time = 0.7579991817474365, total cpu time = 2.359375, sum = 125000002500000
我正在尝试简化一个涉及一组可以并行完成的短任务的程序,在进入下一步之前必须比较这组任务的结果(这又涉及一组短任务,然后是另一组,等等)。由于这些任务的复杂程度,由于设置时间,不值得使用 multiprocessing
。我想知道是否有另一种方法可以比线性更快地并行执行这些短任务。我不认为
为了说明我希望做什么,请考虑对从 0
到 N
的一堆数字求和的问题。 (当然这可以通过分析来解决,我的观点是想出一个低内存但短期 CPU 密集型任务。)首先,线性方法就是:
def numbers(a,b):
return(i for i in range(a,b))
def linear_sum(a):
return(sum(numbers(a[0],a[1])))
n = 2000
linear_sum([0, n+1])
#2001000
对于线程,我想将问题分解成多个部分,然后可以分别求和然后组合,所以我的想法是得到一堆范围,在这些范围内求和
def get_ranges(i, Nprocess = 3):
di = i // Nprocess
j = np.append(np.arange(0, i, di), [i+1,])
return([(j[k], j[k+1]) for k in range(len(j)-1)])
对于某些值n >> NProcesses
,伪代码示例类似于
values = get_ranges(n)
x = []
for value in values:
x.append(do_someting_parallel(value))
return(sum(x))
那么问题来了,如何实现do_someting_parallel
?对于 multiprocessing
,我们可以这样做:
from multiprocessing import Pool as ThreadPool
def mpc_thread_sum(i, Nprocess = 3):
values = get_ranges(i)
pool = ThreadPool(Nprocess)
results = pool.map(linear_sum, values)
pool.close()
pool.join()
return(sum(results))
print(mpc_thread_sum(2000))
# 2001000
下图显示了所描述的不同方法的性能。有没有一种方法可以加快 multiprocessing
仍然比线性慢的区域的计算速度,或者这是 Python 的 GIL 中的并行化限制?我怀疑答案可能是我达到了极限,但想在这里问一下以确定。我尝试了 multiprocessing.dummy
、asyncio
、threading
和 ThreadPoolExecutor
(来自 concurrent.futures
)。为简洁起见,我省略了代码,但所有代码都显示了与线性方法相当的执行时间。所有这些都是为 I/O 任务设计的,因此受 GIL 限制。
我的第一个观察是函数 numbers
的 运行ning 时间可以通过简单地定义为:
def numbers(a, b):
return range(a, b)
其次,如果不借助 C 语言 [=50],使用纯 Python 计算数字总和的 100% CPU 密集型任务永远不会表现得更好 [=50] =]time 库(例如 numpy
),因为争夺全局解释器锁 (GIL),这会阻止任何类型的并行化发生(并且 asyncio
只使用一个线程来处理)。
第三,针对 100% CPU 任务 运行 宁纯 Python 代码的唯一方法是使用多处理。但是在创建进程池时有 CPU 开销,在将参数从主进程传递到地址 space 时有 CPU 开销,进程池的进程在其中 运行ning在传回结果时再次进入和开销。因此,为了提高性能,辅助函数 linear_sum
不能微不足道;它必须需要足够的 CPU 处理才能保证我刚才提到的额外开销。
以下基准测试 运行 是辅助函数,已重命名为 compute_sum
,现在接受 range
作为其参数。为了进一步减少开销,我引入了一个函数 split
,它将采用传递的 range
参数并生成多个 range
实例,从而无需使用 numpy
和生成数组。基准测试使用单线程(线性)、多线程池和多处理池计算总和,并且 n = 2000
和 n = 50_000_000
是 运行 两次。基准显示所有进程的运行时间和总 CPU 时间。
对于 n = 2000
,正如预期的那样,多处理的性能比线性和多线程都要差。对于 n = 50_000_000
,由于上述额外开销,多处理的总 CPU 时间比线性和多线程的时间略长。但是现在经过的时间已经大大减少了。对于 n
的两个值,多线程是失败者。
from multiprocessing.pool import Pool, ThreadPool
import time
def split(iterable, n):
k, m = divmod(len(iterable), n)
return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def compute_sum(r):
t = time.process_time()
return (sum(r), time.process_time() - t)
if __name__ == '__main__':
for n in (2000, 50_000_000):
r = range(0, n+1)
t1 = time.time()
s, cpu = compute_sum(r)
elapsed = time.time() - t1
print(f'n = {n}, linear elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
t1 = time.time()
t2 = time.process_time()
thread_pool = ThreadPool(4)
s = 0
for return_value, process_time in thread_pool.imap_unordered(compute_sum, split(r, 4)):
s += return_value
elapsed = time.time() - t1
cpu = time.process_time() - t2
print(f'n = {n}, thread pool elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
thread_pool.close()
thread_pool.join()
t1 = time.time()
t2 = time.process_time()
pool = Pool(4)
s = 0
cpu = 0
for return_value, process_time in pool.imap_unordered(compute_sum, split(r, 4)):
s += return_value
cpu += process_time
elapsed = time.time() - t1
cpu += time.process_time() - t2
print(f'n = {n}, multiprocessing elapsed time = {elapsed}, total cpu time = {cpu}, sum = {s}')
pool.close()
pool.join()
print()
打印:
n = 2000, linear elapsed time = 0.0, total cpu time = 0.0, sum = 2001000
n = 2000, thread pool elapsed time = 0.00700068473815918, total cpu time = 0.015625, sum = 2001000
n = 2000, multiprocessing elapsed time = 0.13200139999389648, total cpu time = 0.015625, sum = 2001000
n = 50000000, linear elapsed time = 2.0311124324798584, total cpu time = 2.03125, sum = 1250000025000000
n = 50000000, thread pool elapsed time = 2.050999164581299, total cpu time = 2.046875, sum = 1250000025000000
n = 50000000, multiprocessing elapsed time = 0.7579991817474365, total cpu time = 2.359375, sum = 125000002500000