并发期货:何时以及如何实施?

Concurrent Futures: When and how to implement?

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
import numpy as np
import time

#creating iterable
testDict = {}
for i in range(1000):
    testDict[i] = np.random.randint(1,10)
    

#default method
stime = time.time()    
newdict = []

for k, v in testDict.items():
    for i in range(1000):
        v = np.tanh(v)
    newdict.append(v)
    
etime = time.time()
print(etime - stime) 
#output: 1.1139910221099854  



#multi processing
stime = time.time()
testresult = []

def f(item):
    x = item[1]
    for i in range(1000):
        x = np.tanh(x)
    return x

def main(testDict):
    with ProcessPoolExecutor(max_workers = 8) as executor:
        futures = [executor.submit(f, item) for item in testDict.items()]
        for future in as_completed(futures):
            testresult.append(future.result())
            
if __name__ == '__main__':
    main(testDict)    

etime = time.time()
print(etime - stime)
#output: 3.4509658813476562

学习多处理和测试的东西。 运行 检查我是否正确实施的测试。查看所用的输出时间,并发方法慢了 3 倍。那怎么了?

我的 objective 是并行化一个主要在大约 500 个项目的字典上运行的脚本。在每个循环中,处理和更新这 500 个项目的值。假设循环了 5000 代。 None 的 k,v 对与其他 k,v 对相互作用。 [它是一种遗传算法]。

我也在查看有关如何并行化上述 objective 的指南。如果我在我的遗传算法代码中对我的每个函数使用正确的并发期货方法,其中每个函数都接受一个字典的输入并输出一个新字典,它会有用吗?任何 guides/resources/help 表示赞赏。

编辑:如果我 运行 这个例子:https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example,它比默认的 for 循环检查多花 3 倍的时间来求解。

这里有几个基本问​​题,您使用的是 numpy,但没有向量化您的计算。您不会通过在此处编写代码的方式受益于 numpy 的速度优势,还不如使用标准库 math 模块,对于这种代码风格,它比 numpy 更快:

# 0.089sec
import math
for k, v in testDict.items():
    for i in range(1000):
        v = math.tanh(v)
    newdict.append(v)

一旦你向量化操作,你就会看到 numpy 的好处:

# 0.016sec
for k, v in testDict.items():
    arr = no.full(1000, v)
    arr2 = np.tanh(arr)
    newdict.append(arr2[-1])

为了比较,您的原始单线程代码 运行s 在我的测试机器上耗时 1.171 秒。正如您在此处看到的,如果使用不当,NumPy 的运行速度甚至可能比纯 Python.

慢几个数量级

现在来谈谈为什么你会看到你所看到的。

老实说,我无法复制您的计时结果。对于我的 macOS Python 3.6),您的原始多处理代码 运行s 在 0.299 秒内,这比单进程代码更快。但是,如果我不得不猜测,您可能正在使用 Windows?在某些平台,如 Windows,创建一个 child 进程并为 运行 多处理任务设置环境是非常昂贵的,因此对持续时间少于几秒的任务使用多处理是可疑的好处。如果您对原因感兴趣,read here.

此外,在 Python 3.8 或 Windows 之后缺少可用 fork() 的平台,例如 MacOS,当您使用多处理时,child 进程必须重新导入模块,所以如果你将两个代码放在同一个文件中,它必须 运行 在 child 进程中的单线程代码才能 运行 多处理代码。您可能希望将测试代码放入函数中并使用 if __name__ == "__main__" 块保护顶层代码。在使用 Python 3.8 或更高版本的 Mac 上,如果您不调用 Mac 的 non-fork-safe,您还可以通过调用 multiprocessing.set_start_method("fork") 恢复使用 fork 方法框架库。

说完这些,开始你的标题问题。

当您使用多处理时,您需要将数据复制到 child 进程并返回到主进程以检索结果,并且生成 child 进程会产生成本。要从多处理中获益,您需要设计您的工作负载,使这部分成本可以忽略不计。

如果您的数据来自外部源,请尝试在 child 进程中加载​​数据,而不是让主进程加载数据然后将其传输到 child 进程,让主进程进程告诉 child 如何获取它的数据片段。在这里,您在主进程中生成了 testDict,所以如果可以的话,将其并行化并将它们移至 children。

此外,由于您使用的是 numpy,如果您正确地向量化操作,numpy 将在执行向量化操作时释放 GIL,因此您可以只使用多线程。由于 numpy 在矢量操作期间不保留 GIL,因此您可以在单个 Python 进程中利用多个线程,并且不需要将数据分叉或复制到 child 进程,因为线程共享内存。