工作池的优雅终止

Graceful Termination of Worker Pool

我想产生 X 数量的 Pool worker 并给他们每个人 X% 的工作要做。我的问题是这项工作大约需要 20 分钟才能完成,每个额外的过程 运行ning 需要更长的时间,由于所完成的计算类型不同,我的答案可能会在几分钟或几小时内找到。我想做的是为单个工作人员实施某种方式 "HEY I FOUND IT" 并使用该信号杀死池的其余部分并继续我的计算。

要点:

我也考虑过使用队列,但它不会成功,因为我传递给每个队列的工作范围已经内置到函数的参数中。

下面是我正在处理的一个非常简单的版本(我正在处理的计算可能需要几个小时才能完成超过 42 亿个复杂的可迭代对象。)

def doWork():
    workers = Pool(2)
    results = workers.starmap_async( func = distSearch , iterable = Sections1_5,  callback = killPool )
    workers.close()
    print("Found answer : {}".format(results.get()))
    workers.join()

def killPool():
    workers.terminate()
    print("Worker Pool Terminated")

我可能应该指定我的进程仅在找到答案时 returns 否则它只会在完成后退出。我查看了 this 线程,但它让我完全迷失了方向,似乎需要大量开销才能持续检查获胜条件,而这应该出现在工作池的 return/callback 中。

我找到的所有答案都会通过监督工作人员池产生大量开销,我正在寻找一种解决方案,在工作人员级别自动发出终止信号。

I'm looking for a solution that sources the kill signal at the worker level, autonomously.

据我所知,那不存在。 Pool 对象(如 Pool.terminate)的方法应该 在创建池的进程中使用。

您可以使用 Pool.imap_unordered。这个 returns 父进程 中的迭代器 对结果进行迭代,一旦结果可用就会产生结果。一旦弹出所需的结果,您就可以使用 Pool.terminate().

编辑:

  • 从查看 3.5 实现 starmap_async returns 一个 MapResult 实例,它 不是 迭代器。
  • 您可以将多个输入包装在一个元组中,然后在这些输入的列表上使用 imap_unordered