python 关于工人刷新的多处理池通知

python multiprocessing pool notification on worker refreshing

我正在使用 Python 2.7 的 multiprocessing.Pool 来管理一个由 3 个工人组成的池。每个 worker 都相当复杂,并且在连续运行 6-8 小时后,某些第三方代码中存在资源泄漏(大概)会导致问题。所以我想使用 maxtasksperchild 定期刷新工作人员。

我还希望每个工作人员写入自己单独的日志文件。没有 maxtasksperchild 我使用共享 multiprocessing.Value 为每个工作人员分配一个整数(0、1 或 2),然后使用该整数命名日志文件。

对于 maxtasksperchild 我想在工作人员完成后重新使用日志文件。因此,如果整个过程运行一个月,我只需要三个日志文件,而不是每个产生的工人一个日志文件。

如果我可以传递一个回调(例如 finalizer 以配合当前支持的 initializer),这将很简单。没有它,我看不到一个强大而简单的方法来做到这一点。

这是 AFAIK 未记录的,但是 multiprocessingFinalizer class、"which supports object finalization using weakrefs"。您可以使用它在 initializer 中注册终结器。

不过,在这种情况下,我没有看到 multiprocessing.Value 有用的同步选择。多个工作人员可以同时退出,发出哪些文件整数是空闲的信号比一个(锁定的)计数器可以提供的要多。

我建议使用多个裸 multiprocessing.Locks,每个文件一个,而不是:

from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize


def f(n):
    global fileno
    for _ in range(int(n)):  # xrange for Python 2
        pass
    return fileno


def init_fileno(file_locks):
    for i, lock in enumerate(file_locks):
        if lock.acquire(False):  # non-blocking attempt
            globals()['fileno'] = i
            print("{} using fileno: {}".format(current_process().name, i))
            Finalize(lock, lock.release, exitpriority=15)
            break


if __name__ == '__main__':

    n_proc = 3
    file_locks = [Lock() for _ in range(n_proc)]

    pool = Pool(
        n_proc, initializer=init_fileno, initargs=(file_locks,),
        maxtasksperchild=2
    )

    print(pool.map(func=f, iterable=[50e6] * 18))
    pool.close()
    pool.join()
    # all locks should be available if all finalizers did run
    assert all(lock.acquire(False) for lock in file_locks)

输出:

ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]

Process finished with exit code 0

请注意,在 Python 3 中,您不能可靠地使用 Pool 的上下文管理器来代替上面显示的旧方法。池的上下文管理器(不幸的是)调用 terminate(),这可能会在 终结器有机会 运行.

之前杀死工作进程

我最终选择了以下内容。它假定 PID 不会很快回收(对我来说 Ubuntu 是正确的,但在 Unix 上通常不是)。我不认为它做出任何其他假设,但我真的只是对 Ubuntu 感兴趣,所以我没有仔细查看其他平台,例如 Windows。

该代码使用一个数组来跟踪哪些 PID 声明了哪个索引。然后当一个新的 worker 启动时,它会查看是否有任何 PID 不再被使用。如果它找到一个,它假定这是因为工人已经完成了它的工作(或由于其他原因被终止)。如果找不到,那我们就不走运了!所以这并不完美,但我认为它比我迄今为止看到或考虑过的任何东西都简单。

def run_pool():
    child_pids = Array('i', 3)
    pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)

def init_worker(child_pids):
    with child_pids.get_lock():
        available_index = None
        for index, pid in enumerate(child_pids):
            # PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
            # which are no longer in use. We also reclaim the lucky case where a PID was recycled
            # but assigned to one of our workers again, so we know we can take it over
            if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
                available_index = index
                break

        if available_index is not None:
            child_pids[available_index] = os.getpid()
        else:
            # This is unexpected - it means all of the PIDs are in use so we have a logical error
            # or a PID was recycled before we could notice and reclaim its index
            pass

def _is_pid_in_use(pid):
    try:
        os.kill(pid, 0)
        return True
    except OSError:
        return False