运行 IPython 中的 ProcessPoolExecutor
Running a ProcessPoolExecutor in IPython
我是 运行 我的 IPython 解释器(IPython 7.9.0,Python 3.8.0)和 运行 变成 st运行ge 错误。这是我输入的内容:
[In [1]: from concurrent.futures import ProcessPoolExecutor
[In [2]: executor=ProcessPoolExecutor(max_workers=1)
[In [3]: def func():
print('Hello')
[In [4]: future=executor.submit(func)
但是,我收到以下错误:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func' on <module '__main__' (built-in)>
此外,尝试再次提交作业给了我一个不同的错误:
[In [5]: future=executor.submit(func)
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
<ipython-input-5-42bad1a6fe80> in <module>
----> 1 future=executor.submit(func)
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py in submit(*args, **kwargs)
627 with self._shutdown_lock:
628 if self._broken:
--> 629 raise BrokenProcessPool(self._broken)
630 if self._shutdown_thread:
631 raise RuntimeError('cannot schedule new futures after shutdown')
BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
作为健全性检查,我将(几乎)相同的代码输入到 Python 文件中,并从命令行 (python3 test.py
) 中输入 运行。效果很好。
为什么 IPython 我的测试有问题?
编辑:
这是工作正常的 Python 文件。
from concurrent.futures import ProcessPoolExecutor as Executor
def func():
print('Hello')
if __name__ == '__main__':
with Executor(1) as executor:
future=executor.submit(func)
print(future.result())
好的,终于知道是怎么回事了。问题是 Mac OS - 它默认使用 "spawn" 方法来创建子流程。这在此处 https://docs.python.org/3/library/multiprocessing.html 以及将其更改为 fork 的方式进行了解释(尽管它指出 fork 在 Mac os 上是不安全的)。
使用 spawn 方法会启动一个新的 Python 解释器,并将您的代码输入其中。然后这会尝试在 main 下找到您的函数,但在这种情况下没有 main 因为没有程序,只有解释的命令。
如果您将启动方法更改为 fork,您的代码就会运行(但请注意这是不安全的)
In [1]: import multiprocessing as mp
In [2]: mp.set_start_method("fork")
In [3]: def func():
...: print("foo");
...:
In [4]: from concurrent.futures import ProcessPoolExecutor
In [5]: executor=ProcessPoolExecutor(max_workers=1)
In [6]: future=executor.submit(func)
foo
In [7]:
我不确定答案是否有帮助,因为有警告,但它解释了为什么当你有一个程序(你的其他尝试)时它的行为不同以及为什么它在 Ubuntu 上运行良好 - 它使用"fork" 默认。
TLDR;
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# create child processes using 'fork' context
executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork'))
这实际上是由于 python MacOS 3.8 切换到“spawn”方法来创建子进程;而不是 3.8 之前默认的“fork”。以下是一些本质区别:
分叉:
- 克隆父进程的数据和代码,从而继承父程序的状态。
- 子进程对继承变量所做的任何修改都不会反映在父进程中这些变量的状态。从这一点开始,这些状态基本上是 forked(写时复制)。
- 父进程中导入的所有库都可以在子进程的上下文中使用。这也使此方法变得快速,因为子进程不必重新导入库(代码)和变量(数据)。
- 这带来了一些缺点,尤其是在分叉多线程程序方面。
- 一些带有 C 后端的库,如 Tensorflow、OpenCV 等,不是分叉安全的,会导致子进程以不确定的方式挂起。
产卵:
- 在不继承代码或数据的情况下为子进程创建一个新的解释器。
- 只有必要的 data/arguments 被发送到子进程。这意味着子进程不会自动使用变量、线程锁、文件描述符等——这避免了难以捕获的错误。
- 这种方法也有一些缺点 — 因为 data/arguments 需要发送到子进程,所以它们也必须是 pickle-able 的。某些具有内部 locks/mutex 的对象(例如队列)不可 pickle,并且 pickle 较重的对象(例如数据框和大型 numpy 数组)非常昂贵。
- 在子进程上取消选中对象将导致重新导入关联库(如果有)。这又增加了时间。
- 由于父代码未克隆到子进程中,因此在创建子进程时需要使用
if __name__ == '__main__'
守卫。不这样做会使子进程无法从父进程导入代码(现在 运行 为 main)。这也是为什么您的程序在与守卫一起使用时可以正常工作的原因。
如果您注意到 fork 会带来一些由您的程序或导入的非 fork 安全库引起的不可预测的影响,您可以:
- (a) 全局设置多处理上下文以使用 'fork' 方法:
import multiprocessing as mp
mp.set_start_method("fork")
请注意,这将设置全局上下文,您或任何其他导入的库一旦设置就无法更改此上下文。
- (b) 使用多处理的
get_context
方法在本地设置上下文:
import multiprocessing as mp
mp_fork = mp.get_context('fork')
# mp_fork has all the attributes of mp so you can do:
mp_fork.Process(...)
mp_fork.Pool(...)
# using local context will not change global behaviour:
# create child process using global context
# default is fork in < 3.8; spawn otherwise
mp.Process(...)
# most multiprocessing based functionality like ProcessPoolExecutor
# also take context as an argument:
executor=ProcessPoolExecutor(max_workers=1, mp_context=mp_fork)
我是 运行 我的 IPython 解释器(IPython 7.9.0,Python 3.8.0)和 运行 变成 st运行ge 错误。这是我输入的内容:
[In [1]: from concurrent.futures import ProcessPoolExecutor
[In [2]: executor=ProcessPoolExecutor(max_workers=1)
[In [3]: def func():
print('Hello')
[In [4]: future=executor.submit(func)
但是,我收到以下错误:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func' on <module '__main__' (built-in)>
此外,尝试再次提交作业给了我一个不同的错误:
[In [5]: future=executor.submit(func)
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
<ipython-input-5-42bad1a6fe80> in <module>
----> 1 future=executor.submit(func)
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py in submit(*args, **kwargs)
627 with self._shutdown_lock:
628 if self._broken:
--> 629 raise BrokenProcessPool(self._broken)
630 if self._shutdown_thread:
631 raise RuntimeError('cannot schedule new futures after shutdown')
BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
作为健全性检查,我将(几乎)相同的代码输入到 Python 文件中,并从命令行 (python3 test.py
) 中输入 运行。效果很好。
为什么 IPython 我的测试有问题?
编辑:
这是工作正常的 Python 文件。
from concurrent.futures import ProcessPoolExecutor as Executor
def func():
print('Hello')
if __name__ == '__main__':
with Executor(1) as executor:
future=executor.submit(func)
print(future.result())
好的,终于知道是怎么回事了。问题是 Mac OS - 它默认使用 "spawn" 方法来创建子流程。这在此处 https://docs.python.org/3/library/multiprocessing.html 以及将其更改为 fork 的方式进行了解释(尽管它指出 fork 在 Mac os 上是不安全的)。
使用 spawn 方法会启动一个新的 Python 解释器,并将您的代码输入其中。然后这会尝试在 main 下找到您的函数,但在这种情况下没有 main 因为没有程序,只有解释的命令。
如果您将启动方法更改为 fork,您的代码就会运行(但请注意这是不安全的)
In [1]: import multiprocessing as mp
In [2]: mp.set_start_method("fork")
In [3]: def func():
...: print("foo");
...:
In [4]: from concurrent.futures import ProcessPoolExecutor
In [5]: executor=ProcessPoolExecutor(max_workers=1)
In [6]: future=executor.submit(func)
foo
In [7]:
我不确定答案是否有帮助,因为有警告,但它解释了为什么当你有一个程序(你的其他尝试)时它的行为不同以及为什么它在 Ubuntu 上运行良好 - 它使用"fork" 默认。
TLDR;
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# create child processes using 'fork' context
executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork'))
这实际上是由于 python MacOS 3.8 切换到“spawn”方法来创建子进程;而不是 3.8 之前默认的“fork”。以下是一些本质区别:
分叉:
- 克隆父进程的数据和代码,从而继承父程序的状态。
- 子进程对继承变量所做的任何修改都不会反映在父进程中这些变量的状态。从这一点开始,这些状态基本上是 forked(写时复制)。
- 父进程中导入的所有库都可以在子进程的上下文中使用。这也使此方法变得快速,因为子进程不必重新导入库(代码)和变量(数据)。
- 这带来了一些缺点,尤其是在分叉多线程程序方面。
- 一些带有 C 后端的库,如 Tensorflow、OpenCV 等,不是分叉安全的,会导致子进程以不确定的方式挂起。
产卵:
- 在不继承代码或数据的情况下为子进程创建一个新的解释器。
- 只有必要的 data/arguments 被发送到子进程。这意味着子进程不会自动使用变量、线程锁、文件描述符等——这避免了难以捕获的错误。
- 这种方法也有一些缺点 — 因为 data/arguments 需要发送到子进程,所以它们也必须是 pickle-able 的。某些具有内部 locks/mutex 的对象(例如队列)不可 pickle,并且 pickle 较重的对象(例如数据框和大型 numpy 数组)非常昂贵。
- 在子进程上取消选中对象将导致重新导入关联库(如果有)。这又增加了时间。
- 由于父代码未克隆到子进程中,因此在创建子进程时需要使用
if __name__ == '__main__'
守卫。不这样做会使子进程无法从父进程导入代码(现在 运行 为 main)。这也是为什么您的程序在与守卫一起使用时可以正常工作的原因。
如果您注意到 fork 会带来一些由您的程序或导入的非 fork 安全库引起的不可预测的影响,您可以:
- (a) 全局设置多处理上下文以使用 'fork' 方法:
import multiprocessing as mp
mp.set_start_method("fork")
请注意,这将设置全局上下文,您或任何其他导入的库一旦设置就无法更改此上下文。
- (b) 使用多处理的
get_context
方法在本地设置上下文:
import multiprocessing as mp
mp_fork = mp.get_context('fork')
# mp_fork has all the attributes of mp so you can do:
mp_fork.Process(...)
mp_fork.Pool(...)
# using local context will not change global behaviour:
# create child process using global context
# default is fork in < 3.8; spawn otherwise
mp.Process(...)
# most multiprocessing based functionality like ProcessPoolExecutor
# also take context as an argument:
executor=ProcessPoolExecutor(max_workers=1, mp_context=mp_fork)