定期重启 Python 多处理池

Periodically restart Python multiprocessing pool

我有一个 Python 多处理池做一个很长的工作,即使经过彻底的调试也不够健壮,不会每 24 小时左右失败一次,因为它依赖于许多第三方,非Python 具有复杂交互的工具。另外,底层机器有一些我无法控制的问题。请注意,失败并不是指整个程序崩溃,而是部分或大部分进程由于某些错误而闲置,并且应用程序本身挂起或仅使用未失败的进程继续工作。

我现在的解决方案是定期手动终止作业,然后从原来的位置重新启动。

即使不理想,我现在想做的是:从 Python 代码本身以编程方式定期重启多处理池。我真的不在乎这是否意味着在工作中杀死泳池工人。哪种方法最好?

我的代码如下:

with Pool() as p:
    for _ in p.imap_unordered(function, data):
        save_checkpoint()
        log()

我的想法是这样的:

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    with Pool() as p:
        for _ in p.imap_unordered(function, current_data):
            save_checkpoint()
            log()
            start += 1
            end += 1

或:

start = 0
end = 1000  # magic number
while start + 1 < len(data):
    current_data = data[start:end]
    start_timeout(time=TIMEOUT) # which would be the best way to to do that without breaking multiprocessing?
    try:
        with Pool() as p:
            for _ in p.imap_unordered(function, current_data):
                save_checkpoint()
                log()
                start += 1
                end += 1
    except Timeout:
        pass
    

或者您认为更好的任何建议。任何帮助将不胜感激,谢谢!

您当前代码的问题在于它直接迭代多处理结果,并且该调用将阻塞。幸运的是,有一个简单的解决方案:完全按照 in the docs 的建议使用 apply_async。但是由于您在这里描述用例和失败的方式,我对其进行了一些调整。首先是模拟任务:

from multiprocessing import Pool, TimeoutError, cpu_count
from time import sleep
from random import randint


def log():
    print("logging is a dangerous activity: wear a hard hat.")


def work(d):
    sleep(randint(1, 100) / 100)
    print("finished working")
    if randint(1, 10) == 1:
        print("blocking...")
        while True:
            sleep(0.1)

    return d

此工作函数将以 0.1 的概率失败,无限期阻塞。我们创建任务:

data = list(range(100))
nproc = cpu_count()

然后为所有这些生成期货:

while data:
    print(f"== Processing {len(data)} items. ==")
    with Pool(nproc) as p:
        tasks = [p.apply_async(work, (d,)) for d in data]

那我们可以尝试手动把任务搞出来:

        for task in tasks:
            try:
                res = task.get(timeout=1)
                data.remove(res)
                log()
            except TimeoutError:
                failed.append(task)
                if len(failed) < nproc:
                    print(
                        f"{len(failed)} processes are blocked,"
                        f" but {nproc - len(failed)} remain."
                    )
                else:
                    break

这里的控制超时是到.get的超时。它应该与您期望最长的过程花费的时间一样长。请注意,我们检测到整个池何时被捆绑并放弃。

但是由于在您描述的场景中某些线程将比其他线程花费更长的时间,我们可以给 'failed' 进程一些时间来恢复。因此,每次任务失败时,我们都会快速检查其他任务是否真的成功了:

            for task in failed:
                try:
                    res = task.get(timeout=0.01)
                    data.remove(res)
                    failed.remove(task)
                    log()
                except TimeoutError:
                    continue

这是否适合您的情况取决于您的任务是否真的像我猜测的那样不稳定。

退出池的上下文管理器将终止池,因此我们甚至不需要自己处理。如果您有显着变化,您可能需要增加池大小(从而增加允许暂停的任务数量)或在考虑任务之前允许任务有一个宽限期 'failed'.