python 关于工人刷新的多处理池通知
python multiprocessing pool notification on worker refreshing
我正在使用 Python 2.7 的 multiprocessing.Pool
来管理一个由 3 个工人组成的池。每个 worker 都相当复杂,并且在连续运行 6-8 小时后,某些第三方代码中存在资源泄漏(大概)会导致问题。所以我想使用 maxtasksperchild
定期刷新工作人员。
我还希望每个工作人员写入自己单独的日志文件。没有 maxtasksperchild
我使用共享 multiprocessing.Value
为每个工作人员分配一个整数(0、1 或 2),然后使用该整数命名日志文件。
对于 maxtasksperchild
我想在工作人员完成后重新使用日志文件。因此,如果整个过程运行一个月,我只需要三个日志文件,而不是每个产生的工人一个日志文件。
如果我可以传递一个回调(例如 finalizer
以配合当前支持的 initializer
),这将很简单。没有它,我看不到一个强大而简单的方法来做到这一点。
这是 AFAIK 未记录的,但是 multiprocessing
有 Finalizer
class、"which supports object finalization using weakrefs"。您可以使用它在 initializer
中注册终结器。
不过,在这种情况下,我没有看到 multiprocessing.Value
有用的同步选择。多个工作人员可以同时退出,发出哪些文件整数是空闲的信号比一个(锁定的)计数器可以提供的要多。
我建议使用多个裸 multiprocessing.Lock
s,每个文件一个,而不是:
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize
def f(n):
global fileno
for _ in range(int(n)): # xrange for Python 2
pass
return fileno
def init_fileno(file_locks):
for i, lock in enumerate(file_locks):
if lock.acquire(False): # non-blocking attempt
globals()['fileno'] = i
print("{} using fileno: {}".format(current_process().name, i))
Finalize(lock, lock.release, exitpriority=15)
break
if __name__ == '__main__':
n_proc = 3
file_locks = [Lock() for _ in range(n_proc)]
pool = Pool(
n_proc, initializer=init_fileno, initargs=(file_locks,),
maxtasksperchild=2
)
print(pool.map(func=f, iterable=[50e6] * 18))
pool.close()
pool.join()
# all locks should be available if all finalizers did run
assert all(lock.acquire(False) for lock in file_locks)
输出:
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
Process finished with exit code 0
请注意,在 Python 3 中,您不能可靠地使用 Pool 的上下文管理器来代替上面显示的旧方法。池的上下文管理器(不幸的是)调用 terminate()
,这可能会在 终结器有机会 运行.
之前杀死工作进程
我最终选择了以下内容。它假定 PID 不会很快回收(对我来说 Ubuntu 是正确的,但在 Unix 上通常不是)。我不认为它做出任何其他假设,但我真的只是对 Ubuntu 感兴趣,所以我没有仔细查看其他平台,例如 Windows。
该代码使用一个数组来跟踪哪些 PID 声明了哪个索引。然后当一个新的 worker 启动时,它会查看是否有任何 PID 不再被使用。如果它找到一个,它假定这是因为工人已经完成了它的工作(或由于其他原因被终止)。如果找不到,那我们就不走运了!所以这并不完美,但我认为它比我迄今为止看到或考虑过的任何东西都简单。
def run_pool():
child_pids = Array('i', 3)
pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)
def init_worker(child_pids):
with child_pids.get_lock():
available_index = None
for index, pid in enumerate(child_pids):
# PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
# which are no longer in use. We also reclaim the lucky case where a PID was recycled
# but assigned to one of our workers again, so we know we can take it over
if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
available_index = index
break
if available_index is not None:
child_pids[available_index] = os.getpid()
else:
# This is unexpected - it means all of the PIDs are in use so we have a logical error
# or a PID was recycled before we could notice and reclaim its index
pass
def _is_pid_in_use(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False
我正在使用 Python 2.7 的 multiprocessing.Pool
来管理一个由 3 个工人组成的池。每个 worker 都相当复杂,并且在连续运行 6-8 小时后,某些第三方代码中存在资源泄漏(大概)会导致问题。所以我想使用 maxtasksperchild
定期刷新工作人员。
我还希望每个工作人员写入自己单独的日志文件。没有 maxtasksperchild
我使用共享 multiprocessing.Value
为每个工作人员分配一个整数(0、1 或 2),然后使用该整数命名日志文件。
对于 maxtasksperchild
我想在工作人员完成后重新使用日志文件。因此,如果整个过程运行一个月,我只需要三个日志文件,而不是每个产生的工人一个日志文件。
如果我可以传递一个回调(例如 finalizer
以配合当前支持的 initializer
),这将很简单。没有它,我看不到一个强大而简单的方法来做到这一点。
这是 AFAIK 未记录的,但是 multiprocessing
有 Finalizer
class、"which supports object finalization using weakrefs"。您可以使用它在 initializer
中注册终结器。
不过,在这种情况下,我没有看到 multiprocessing.Value
有用的同步选择。多个工作人员可以同时退出,发出哪些文件整数是空闲的信号比一个(锁定的)计数器可以提供的要多。
我建议使用多个裸 multiprocessing.Lock
s,每个文件一个,而不是:
from multiprocessing import Pool, Lock, current_process
from multiprocessing.util import Finalize
def f(n):
global fileno
for _ in range(int(n)): # xrange for Python 2
pass
return fileno
def init_fileno(file_locks):
for i, lock in enumerate(file_locks):
if lock.acquire(False): # non-blocking attempt
globals()['fileno'] = i
print("{} using fileno: {}".format(current_process().name, i))
Finalize(lock, lock.release, exitpriority=15)
break
if __name__ == '__main__':
n_proc = 3
file_locks = [Lock() for _ in range(n_proc)]
pool = Pool(
n_proc, initializer=init_fileno, initargs=(file_locks,),
maxtasksperchild=2
)
print(pool.map(func=f, iterable=[50e6] * 18))
pool.close()
pool.join()
# all locks should be available if all finalizers did run
assert all(lock.acquire(False) for lock in file_locks)
输出:
ForkPoolWorker-1 using fileno: 0
ForkPoolWorker-2 using fileno: 1
ForkPoolWorker-3 using fileno: 2
ForkPoolWorker-4 using fileno: 0
ForkPoolWorker-5 using fileno: 1
ForkPoolWorker-6 using fileno: 2
[0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2, 0, 0, 1, 1, 2, 2]
Process finished with exit code 0
请注意,在 Python 3 中,您不能可靠地使用 Pool 的上下文管理器来代替上面显示的旧方法。池的上下文管理器(不幸的是)调用 terminate()
,这可能会在 终结器有机会 运行.
我最终选择了以下内容。它假定 PID 不会很快回收(对我来说 Ubuntu 是正确的,但在 Unix 上通常不是)。我不认为它做出任何其他假设,但我真的只是对 Ubuntu 感兴趣,所以我没有仔细查看其他平台,例如 Windows。
该代码使用一个数组来跟踪哪些 PID 声明了哪个索引。然后当一个新的 worker 启动时,它会查看是否有任何 PID 不再被使用。如果它找到一个,它假定这是因为工人已经完成了它的工作(或由于其他原因被终止)。如果找不到,那我们就不走运了!所以这并不完美,但我认为它比我迄今为止看到或考虑过的任何东西都简单。
def run_pool():
child_pids = Array('i', 3)
pool = Pool(3, initializser=init_worker, initargs=(child_pids,), maxtasksperchild=1000)
def init_worker(child_pids):
with child_pids.get_lock():
available_index = None
for index, pid in enumerate(child_pids):
# PID 0 means unallocated (this happens when our pool is started), we reclaim PIDs
# which are no longer in use. We also reclaim the lucky case where a PID was recycled
# but assigned to one of our workers again, so we know we can take it over
if not pid or not _is_pid_in_use(pid) or pid == os.getpid():
available_index = index
break
if available_index is not None:
child_pids[available_index] = os.getpid()
else:
# This is unexpected - it means all of the PIDs are in use so we have a logical error
# or a PID was recycled before we could notice and reclaim its index
pass
def _is_pid_in_use(pid):
try:
os.kill(pid, 0)
return True
except OSError:
return False