Python: 在生成器中并行执行 yield 表达式
Python: parallel execution of yield expressions in generator
我有一个生成器函数,它迭代大量参数并使用此参数生成另一个函数的结果。内部函数可能有相当长的执行时间,所以我想使用多处理来加速进程。也许这很重要,我也希望能够在执行过程中停止这个生成器。但我不确定实现这种逻辑的正确方法是什么。我需要像队列这样的东西,能够在旧任务完成后添加新任务,并在它们准备好后立即产生结果。我查看了 multiprocessing.Queue,但乍一看似乎不适合我的情况。可能有人可以建议我在这种情况下应该使用什么?
这是我的任务的大概代码:
def gen(**kwargs):
for param in get_params():
yield inner_func(param)
使用 multiprocessing.pool.Pool
class 进行多重处理,因为它的 terminate
方法将取消所有 运行ning 任务以及计划到 运行 的任务( concurrent.futures
模块终止方法不会取消已经 运行ning 任务)。正如@MisterMiyakgi 指出的那样,没有必要使用发电机。但是,您应该使用 imap_unordered
方法,该方法 returns 一个 iterable 可以迭代并允许您获得由 inner_function
,而如果您要使用 map
,您将无法获得第一个生成的值,直到生成所有值。
from multiprocessing import Pool
def get_params():
""" Generator function. """
# For example:
for next_param in range(10):
yield next_param
def inner_function(param):
""" Long running function. """
# For example:
return param ** 2
def gen():
pool = Pool()
# Use imap_unordered if we do not care about the order of results else imap:
iterable = pool.imap_unordered(inner_function, get_params())
# The iterable can be iterated as if it were a generator
# Add terminate method to iterable:
def terminate():
pool.terminate()
pool.close()
pool.join()
iterable.terminate = terminate
return iterable
# Usage:
# Required for Windows
if __name__ == '__main__':
iterable = gen()
# iterable.terminate() should be called when done iterating the iterable
# but it can be called any time to kill all running tasks and scheduled tasks.
# After calling terminate() do not further iterate the iterable.
for result in iterable:
print(result)
if result == 36:
iterable.terminate() # kill all remaining tasks, if any
break
打印:
0
1
4
9
16
25
36
我有一个生成器函数,它迭代大量参数并使用此参数生成另一个函数的结果。内部函数可能有相当长的执行时间,所以我想使用多处理来加速进程。也许这很重要,我也希望能够在执行过程中停止这个生成器。但我不确定实现这种逻辑的正确方法是什么。我需要像队列这样的东西,能够在旧任务完成后添加新任务,并在它们准备好后立即产生结果。我查看了 multiprocessing.Queue,但乍一看似乎不适合我的情况。可能有人可以建议我在这种情况下应该使用什么?
这是我的任务的大概代码:
def gen(**kwargs):
for param in get_params():
yield inner_func(param)
使用 multiprocessing.pool.Pool
class 进行多重处理,因为它的 terminate
方法将取消所有 运行ning 任务以及计划到 运行 的任务( concurrent.futures
模块终止方法不会取消已经 运行ning 任务)。正如@MisterMiyakgi 指出的那样,没有必要使用发电机。但是,您应该使用 imap_unordered
方法,该方法 returns 一个 iterable 可以迭代并允许您获得由 inner_function
,而如果您要使用 map
,您将无法获得第一个生成的值,直到生成所有值。
from multiprocessing import Pool
def get_params():
""" Generator function. """
# For example:
for next_param in range(10):
yield next_param
def inner_function(param):
""" Long running function. """
# For example:
return param ** 2
def gen():
pool = Pool()
# Use imap_unordered if we do not care about the order of results else imap:
iterable = pool.imap_unordered(inner_function, get_params())
# The iterable can be iterated as if it were a generator
# Add terminate method to iterable:
def terminate():
pool.terminate()
pool.close()
pool.join()
iterable.terminate = terminate
return iterable
# Usage:
# Required for Windows
if __name__ == '__main__':
iterable = gen()
# iterable.terminate() should be called when done iterating the iterable
# but it can be called any time to kill all running tasks and scheduled tasks.
# After calling terminate() do not further iterate the iterable.
for result in iterable:
print(result)
if result == 36:
iterable.terminate() # kill all remaining tasks, if any
break
打印:
0
1
4
9
16
25
36