使用 concurrent.futures 每秒并行调用一个 fn

Using concurrent.futures to call a fn in parallel every second

我一直在努力掌握如何使用 concurrent.futures 每秒调用一个函数 3 次,而无需等待它到 return。我会在完成所有需要拨打的电话后收集结果。

这是我目前所在的位置,令我惊讶的是此示例函数中的 sleep() 阻止我的代码启动下一个 3 函数调用块。显然我对这里的文档理解不够:)

def print_something(thing):
    print(thing)
    time.sleep(10)

# define a generator 
def chunks(l, n):
    """Yield successive n-sized chunks from l."""    
    for i in range(0, len(l), n):
        yield l[i:i + n]

def main():    
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    for current_chunk in chunks(alphabet, 3):  # Restrict to calling the function 3 times per second
        with ProcessPoolExecutor(max_workers=3) as executor:        
            futures = { executor.submit(print_something, thing): thing for thing in current_chunk }
            chunk_number += 1            
            print('chunk %s' % chunk_number)
            time.sleep(1)            
        
    for result in as_completed(futures): 
        print(result.result())

此代码导致打印出 3 个块,每个块之间有 10 秒的休眠时间 chunk.How 我可以更改它以确保我在调用之前不会等待函数到 return下一批?

谢谢

首先,对于 for current_chunk in chunks(alphabet, 3): 的每次迭代,您都在创建一个新的 ProcessPoolExecutor 实例和 futures 字典实例来破坏前一个实例。所以最后一个循环 for result in as_completed(futures): 只会打印最后提交的块的结果。其次,我认为您挂起的原因是,由 with ProcessPoolExecutor(max_workers=3) as executor: 管理的块将不会终止,直到 executor 提交的任务完成并且至少需要 10 秒。因此,for current_chunk in chunks(alphabet, 3): 块的下一次迭代不会比每 10 秒执行一次更频繁。

另请注意,出于同样的原因,块 for result in as_completed(futures): 需要在 with ThreadPoolExecutor(max_workers=26) as executor: 块内移动。也就是说,如果它放在后面,它将在所有任务完成后才会执行,因此您将无法“在它们完成时”获得结果。

你需要做一些重新排列,如下所示(我也将 print_something 修改为 return 而不是 None。现在应该没有挂起,如果你有足够的工作人员 (26) 运行 正在提交的 26 个任务。我怀疑你的桌面(如果你在你的 PC 上 运行ning 这个)有 26 个核心来支持 26 并发 正在执行进程。但我注意到 print_something 只打印一个短字符串,然后休眠 10 秒,这允许它把它的处理器交给池中的另一个进程。所以,虽然 cpu-intensive任务,通过指定一个 max_workers 值大于您计算机上的实际物理 processors/cores 数量是没有什么好处的,在这种情况下它是可以的。但是当您有花费很少的任务时效率更高执行实际 Python 字节代码的时间是使用线程而不是进程,因为创建线程的成本远低于创建进程的成本。但是,当您执行 运行 任务时,线程是出了名的差宁大y 由 Python 字节代码组成,因为由于全局解释器锁 (GIL) 的序列化,此类代码无法并发执行。

您要研究的主题:全局解释器锁 (GIL) 和 Python 字节码执行

更新以使用线程:

所以我们应该用 26 个或更多 light-weight 线程替换 ThreadPoolExecutor 来替换 ProcessPoolExecutorconcurrent.futures 模块的美妙之处在于不需要更改其他代码。但最重要的是更改块结构并具有单个 executor.

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def print_something(thing):
    # NOT cpu-intensive, so threads should work well here
    print(thing)
    time.sleep(10)
    return thing # so there is a non-None result
    

# define a generator
def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

def main():
    chunk_number = 0
    alphabet = ['a','b','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z']
    futures = {}
    with ThreadPoolExecutor(max_workers=26) as executor:
        for current_chunk in chunks(alphabet, 3):  # Restrict to calling the function 3 times per second
            futures.update({executor.submit(print_something, thing): thing for thing in current_chunk })
            chunk_number += 1
            print('chunk %s' % chunk_number)
            time.sleep(1)

        # needs to be within the executor block else it won't run until all futures are complete    
        for result in as_completed(futures):
            print(result.result())

if __name__ == '__main__':
    main()