Python: 在生成器中并行执行 yield 表达式

Python: parallel execution of yield expressions in generator

我有一个生成器函数,它迭代大量参数并使用此参数生成另一个函数的结果。内部函数可能有相当长的执行时间,所以我想使用多处理来加速进程。也许这很重要,我也希望能够在执行过程中停止这个生成器。但我不确定实现这种逻辑的正确方法是什么。我需要像队列这样的东西,能够在旧任务完成后添加新任务,并在它们准备好后立即产生结果。我查看了 multiprocessing.Queue,但乍一看似乎不适合我的情况。可能有人可以建议我在这种情况下应该使用什么?

这是我的任务的大概代码:

def gen(**kwargs):
    for param in get_params():
        yield inner_func(param)

使用 multiprocessing.pool.Pool class 进行多重处理,因为它的 terminate 方法将取消所有 运行ning 任务以及计划到 运行 的任务( concurrent.futures 模块终止方法不会取消已经 运行ning 任务)。正如@MisterMiyakgi 指出的那样,没有必要使用发电机。但是,您应该使用 imap_unordered 方法,该方法 returns 一个 iterable 可以迭代并允许您获得由 inner_function,而如果您要使用 map,您将无法获得第一个生成的值,直到生成所有值。

from multiprocessing import Pool

def get_params():
    """ Generator function. """
    # For example:
    for next_param in range(10):
        yield next_param

def inner_function(param):
    """ Long running function. """
    # For example:
    return param ** 2

def gen():
    pool = Pool()
    # Use imap_unordered if we do not care about the order of results else imap:
    iterable = pool.imap_unordered(inner_function, get_params())
    # The iterable can be iterated as if it were a generator
    # Add terminate method to iterable:
    def terminate():
        pool.terminate()
        pool.close()
        pool.join()
    iterable.terminate = terminate
    return iterable


# Usage:
# Required for Windows
if __name__ == '__main__':
    iterable = gen()
    # iterable.terminate() should be called when done iterating the iterable
    # but it can be called any time to kill all running tasks and scheduled tasks.
    # After calling terminate() do not further iterate the iterable.
    for result in iterable:
        print(result)
        if result == 36:
            iterable.terminate() # kill all remaining tasks, if any
            break

打印:

0
1
4
9
16
25
36