python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能对比

python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map()

我正在使用 concurrent.futures.ProcessPoolExecutor 从数字范围中查找数字的出现。目的是调查从并发中获得的加速性能。为了对性能进行基准测试,我有一个控件 - 一个用于执行所述任务的串行代码(如下所示)。我写了 2 个并发代码,一个使用 concurrent.futures.ProcessPoolExecutor.submit(),另一个使用 concurrent.futures.ProcessPoolExecutor.map() 来执行相同的任务。它们如下所示。关于起草前者和后者的建议可以分别看到 and

三个代码的任务都是求数字5在0到1E8的数字范围内出现的次数。 .submit().map() 都分配了 6 个工人,.map() 的块大小为 10,000。离散工作负载的方式在并发代码中是相同的。但是,用于在两个代码中查找出现次数的函数是不同的。这是因为将参数传递给 .submit().map() 调用的函数的方式不同。

所有 3 个代码报告的出现次数相同,即 56,953,279 次。但是,完成任务所需的时间却大不相同。 .submit() 的执行速度比对照快 2 倍,而 .map() 完成任务的时间是对照的两倍。

问题:

  1. 我想知道 .map() 的缓慢性能是我编码的人为因素还是它本身就很慢?”如果是前者,我该如何改进它。我只是惊讶于它的表现比对照慢,因为没有太多动力去使用它。
  2. 我想知道是否有任何方法可以使 .submit() 代码执行得更快。我有一个条件是函数 _concurrent_submit() 必须 return 一个包含数字 5 的 numbers/occurrences 的可迭代对象。

基准测试结果

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

序列号:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('\n main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

2017 年 2 月 13 日更新:

除了@niemmi 的回答外,我还根据一些个人研究提供了一个答案:

  1. 如何进一步加速@niemmi 的.map().submit() 解决方案,以及
  2. ProcessPoolExecutor.map()ProcessPoolExecutor.submit() 更快时。

您在这里将苹果与橙子进行比较。使用 map 时,您会生成所有 1E8 数字并将它们传输到工作进程。与实际执行相比,这需要花费大量时间。使用 submit 时,您只需创建 6 组参数即可传输。

如果您更改 map 以使用相同的原理操作,您将获得彼此接近的数字:

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunk * i for i in range(workers))
        cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
        futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))

        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

您可以通过正确使用 as_completed 来提高提交的性能。对于给定的可迭代期货,它将 return 一个迭代器,它将 yield 期货按照它们完成的顺序进行。

您也可以跳过将数据复制到另一个数组并使用 itertools.chain.from_iterable 将 futures 的结果合并为单个可迭代对象:

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('\n def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))

    return chain.from_iterable(f.result() for f in cf.as_completed(futures))

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('\n main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))

概览:

我的回答分为两部分:

  • 第 1 部分展示了如何从@niemmi 的 ProcessPoolExecutor.map() 解决方案中获得更多 speed-up。
  • 第 2 部分显示 ProcessPoolExecutor 的子类 .submit().map() 产生 non-equivalent 计算时间。

====================================== ===============================

第 1 部分:更多 Speed-up ProcessPoolExecutor.map()

背景: 本节基于@niemmi 的 .map() 解决方案,该解决方案本身非常出色。在对他的离散化方案进行一些研究以更好地理解它如何与 .map() 块大小争论相互作用时,我发现了这个有趣的解决方案。

我认为@niemmi 对 chunk = nmax // workers 的定义是 chunksize 的定义,即较小的实际数量 运行ge (给定任务)由工作池中的每个工作人员处理.现在,这个定义的前提是假设如果一台计算机有 x 个工作人员,将任务平均分配给每个工作人员将导致每个工作人员的最佳使用,因此总任务将以最快的速度完成。因此,将给定任务分解成的块数应始终等于池工作人员的数量。然而,这个假设是否正确?

命题: 在这里,我提出上述假设在与ProcessPoolExecutor.map()一起使用时并不总是导致最快的计算时间。相反,将任务离散化到大于池工作人员数量的数量可以导致 speed-up,即更快地完成给定任务

实验: 我修改了@niemmi 的代码,允许离散化任务的数量超过池工作人员的数量。下面给出了这段代码,用于计算数字 5 在 0 到 1E8 的数字 运行ge 中出现的次数。我已经使用 1、2、4 和 6 个池工作人员执行了这段代码,并针对离散化任务数量与池工作人员数量的各种比率。对于每个场景,进行 3 次运行并将计算时间制成表格。 “Speed-up”在这里定义为使用相等数量的块和池工作者的平均计算时间超过离散化任务数量大于数量时的平均计算时间泳池工人。

调查结果:

  1. 左图显示了实验部分提到的所有场景所花费的计算时间。它表明 块数 / 工人数 = 1 所花费的 计算时间总是大于 块数 > 所花费的计算时间工人数量. 也就是说,前者的效率总是低于后者。

  2. 右图显示a speed-up在chunk数/worker数[=143时获得了1.2倍以上的收益=] 达到阈值 14 或更多 。有趣的是,speed-up 趋势也发生在 ProcessPoolExecutor.map() 由 1 个 worker 执行时。

结论: 当自定义 ProcessPoolExecutor.map()` 应该用于解决给定任务的离散任务的数量时,谨慎的做法是确保这个数字是大于池工作人员的数量,因为这种做法缩短了计算时间。

concurrent.futures.ProcessPoolExecutor.map() 代码。 (仅修订部分)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('\n within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

====================================== ===============================

第 2 部分:当 returning sorted/ordered 结果列表时,使用 ProcessPoolExecutor 子类 .submit() 和 .map() 的总计算时间可能不同。

背景: 我修改了 .submit().map() 代码以允许 "apple-to-apple" 比较它们的计算时间和能力可视化主代码的计算时间,主代码执行并发操作调用的_concurrent方法的计算时间,以及_concurrent方法调用的每个离散化task/worker的计算时间。此外,这些代码中的并发方法被构造为 return 直接来自 .submit() 的未来对象和 .map() 的迭代器的结果的无序列表和有序列表。下面提供了源代码(希望对您有所帮助。)。

实验 这两个新改进的代码用于执行第 1 部分中描述的相同实验,不同的是只考虑了 6 个池工作人员和 python built-in listsorted 方法分别用于 return 代码主要部分的结果的无序列表和有序列表。

调查结果:

  1. 从 _concurrent 方法的结果中,我们可以看到用于创建 ProcessPoolExecutor.submit() 的所有 Future 对象和创建 ProcessPoolExecutor.map() 的迭代器的 _concurrent 方法的计算时间作为一个函数的数量池工作人员数量上的离散任务是等效的。这个结果只是意味着 ProcessPoolExecutor sub-classes .submit().map() 等同于 efficient/fast.
  2. 比较 main 和它的 _concurrent 方法的计算时间,我们可以看到 main 运行 比它的 _concurrent 方法长。这是意料之中的,因为它们的时间差反映了 listsorted 方法(以及包含在这些方法中的其他方法)的计算时间量。很明显,list 方法比 sorted 方法花费更少的计算时间来 return 结果列表。 .submit() 和 .map() 代码的 list 方法的平均计算时间相似,约为 0.47 秒。 .submit() 和 .map() 代码的排序方法的平均计算时间分别为 1.23 秒和 1.01 秒。换句话说,对于 .submit() 和 .map() 代码,list 方法分别比 sorted 方法快 2.62 倍和 2.15 倍。
  3. 不清楚为什么 sorted 方法从 .map().submit() 快,因为离散化的数量 tasks 增加超过 pool workers 的数量,save when 离散化任务的数量等于池工作人员的数量。 也就是说,这些发现表明,使用同样快的 .submit().map() sub-classes 的决定可能会受到 sorted 方法的阻碍。例如,如果目的是在尽可能短的时间内生成有序列表,则应优先使用 ProcessPoolExecutor.map() 而不是 ProcessPoolExecutor.submit(),因为 .map() 可以允许最短的总计算时间.
  4. 我的回答第 1 部分中提到的离散化方案在这里显示 speed-up .submit().map() sub-classes 的性能。 speed-up 的数量可以比离散化任务数量等于 pool worker 数量的情况高出 20%。

改进的 .map() 代码

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('\n main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    

改进了 .submit() 代码。
此代码与 .map 代码相同,只是您将 _concurrent 方法替换为以下内容:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

====================================== ===============================