`multiprocessing` 与 `concurrent.futures` 中的 Max Workers
Max Workers in `multiprocessing` vs `concurrent.futures`
在Python 3.8 中,concurrent.futures.ProcessPoolExecutor
已更新,将Windows 上可使用的最大worker(进程)数限制为61 个。原因请参见this and this,但据我了解:
- 在 Windows 上,
multiprocessing
调用 Windows API 函数 WaitForMultipleObjects
,用于等待进程完成。它最多可以等待 63 个对象,减去结果队列 reader 和线程唤醒 reader,因此有 61 个限制。 (即 Windows 每个进程使用一个线程来跟踪进程)。
(另见 this SO issue)
multiprocessing
,但是,仍然使用os.cpu_count()
。它首先抛出 Value Error
,但随后继续并使用了我 100% 的 CPU 核心。例如,
Exception in thread Thread-N:
Traceback (most recent call last):
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 519, in _handle_workers
cls._wait_for_updates(current_sentinels, change_notifier)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 499, in _wait_for_updates
wait(sentinels, timeout=timeout)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 879, in wait
ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 98
我的机器有 96 个内核。这个“错误”真的是错误吗?如果不是,我是否应该只使用 multiprocessing
模块而不是 concurrent.futures
模块,这将我的 CPU 使用限制为 61 个内核?
编辑: 我怀疑这是一个错误,因为我认为 multiprocess
将继续等待抛出错误的进程完成。如果我不限制核心数量,这似乎会发生(程序在 CPU 使用结束后挂起)。不过,我不确定是不是真的。
你的问题很好。查看代码,这似乎是一个不可恢复的错误。但在我看来,ThreadPoolExecutor
中会有代码将 Windows 下的池大小限制为 61,而不对 multiprocessing.Pool
class 强制执行,这在我看来是不可理解的。无论如何,用下面的程序检查应该很容易。如果它不打印 Done! 并挂起,我会说肯定有问题,如果您使用 multiprocessing.Pool
:[=15=,您应该明确限制池大小]
import multiprocessing
def worker(x):
return x ** 2
def main():
pool = multiprocessing.Pool(96)
results = pool.map(worker, range(96))
assert len(results) == 96
pool.close()
pool.join()
print('Done!')
if __name__ == '__main__':
main()
但是您的程序挂起这一事实是相当确定的,上面的程序将挂起,我怀疑您甚至不会到达 assert
语句。无论哪种方式,使用大于 61 的池大小都是不可靠的。
在Python 3.8 中,concurrent.futures.ProcessPoolExecutor
已更新,将Windows 上可使用的最大worker(进程)数限制为61 个。原因请参见this and this,但据我了解:
- 在 Windows 上,
multiprocessing
调用 Windows API 函数WaitForMultipleObjects
,用于等待进程完成。它最多可以等待 63 个对象,减去结果队列 reader 和线程唤醒 reader,因此有 61 个限制。 (即 Windows 每个进程使用一个线程来跟踪进程)。
(另见 this SO issue)
multiprocessing
,但是,仍然使用os.cpu_count()
。它首先抛出 Value Error
,但随后继续并使用了我 100% 的 CPU 核心。例如,
Exception in thread Thread-N:
Traceback (most recent call last):
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 519, in _handle_workers
cls._wait_for_updates(current_sentinels, change_notifier)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\pool.py", line 499, in _wait_for_updates
wait(sentinels, timeout=timeout)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 879, in wait
ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
File "C:\Users\username\AppData\Local\Programs\Python\Python38\lib\multiprocessing\connection.py", line 811, in _exhaustive_wait
res = _winapi.WaitForMultipleObjects(L, False, timeout)
ValueError: need at most 63 handles, got a sequence of length 98
我的机器有 96 个内核。这个“错误”真的是错误吗?如果不是,我是否应该只使用 multiprocessing
模块而不是 concurrent.futures
模块,这将我的 CPU 使用限制为 61 个内核?
编辑: 我怀疑这是一个错误,因为我认为 multiprocess
将继续等待抛出错误的进程完成。如果我不限制核心数量,这似乎会发生(程序在 CPU 使用结束后挂起)。不过,我不确定是不是真的。
你的问题很好。查看代码,这似乎是一个不可恢复的错误。但在我看来,ThreadPoolExecutor
中会有代码将 Windows 下的池大小限制为 61,而不对 multiprocessing.Pool
class 强制执行,这在我看来是不可理解的。无论如何,用下面的程序检查应该很容易。如果它不打印 Done! 并挂起,我会说肯定有问题,如果您使用 multiprocessing.Pool
:[=15=,您应该明确限制池大小]
import multiprocessing
def worker(x):
return x ** 2
def main():
pool = multiprocessing.Pool(96)
results = pool.map(worker, range(96))
assert len(results) == 96
pool.close()
pool.join()
print('Done!')
if __name__ == '__main__':
main()
但是您的程序挂起这一事实是相当确定的,上面的程序将挂起,我怀疑您甚至不会到达 assert
语句。无论哪种方式,使用大于 61 的池大小都是不可靠的。