Is multiprocessing.Pool not allowed in Airflow task? - AssertionError: daemonic processes are not allowed to have children
Is multiprocessing.Pool not allowed in Airflow task? - AssertionError: daemonic processes are not allowed to have children
我们的气流项目有一个从 BigQuery 查询并使用 Pool
并行转储到本地 JSON 文件的任务:
def dump_in_parallel(table_name):
base_query = f"select * from models.{table_name}"
all_conf_ids = range(1,10)
n_jobs = 4
with Pool(n_jobs) as p:
p.map(partial(dump_conf_id, base_query = base_query), all_conf_ids)
with open("/tmp/final_output.json", "wb") as f:
filenames = [f'/tmp/output_file_{i}.json' for i in all_conf_ids]
此任务在 airflow v1.10 中对我们来说工作正常,但在 v2.1+ 中不再工作。此处的第 2.1 节 - https://blog.mbedded.ninja/programming/languages/python/python-multiprocessing/ - 提到 “如果您尝试从已经使用池创建的 child 工作人员中创建池,您将 运行 进入错误:守护进程不允许有 children"
这是完整的 Airflow 错误:
[2021-08-22 02:11:53,064] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/plugins/tasks/bigquery.py", line 249, in dump_in_parallel
with Pool(n_jobs) as p:
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
如果重要,我们 运行 使用 LocalExecutor 气流。知道为什么这个使用 Pool 的任务会在 airflow v1.10 中工作但不再在 airflow 2.1 中工作吗?
根据 https://github.com/celery/celery/issues/4525,将 multiprocessing
库替换为 billiard
库有效。我们不知道为什么将一个库替换为另一个库可以解决此问题...
Airflow 2 在后台使用不同的处理模型来加速处理,同时在 运行ning 任务之间保持基于进程的隔离。
这就是为什么它在 运行 任务的钩子下使用 forking
和多处理,但这也意味着如果您正在使用多处理,您将达到 Python 多处理的限制不允许链接多处理。
我不是 100% 确定它是否有效,但您可以尝试将 execute_tasks_new_python_interpreter
配置设置为 True。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter。此设置将导致 airflow 在 运行ning 任务而不是 forking/using 多处理时启动新的 Python 解释器(尽管我不是 100% 确定后者)。尽管 运行 你的任务会慢很多(最多几秒钟的开销),因为新的 Python 解释器必须在 运行 之前重新初始化并导入所有气流代码完成你的任务。
如果这不起作用,那么您可以使用 PythonVirtualenvOperator 午餐您的多处理作业 - 它将启动一个新的 Python 解释器到 运行 您的 python代码,您应该能够使用多处理。
您可以切换到带有锁定后端的 joblib python 库,并在执行后使用以下 instructions.
终止守护进程
我们的气流项目有一个从 BigQuery 查询并使用 Pool
并行转储到本地 JSON 文件的任务:
def dump_in_parallel(table_name):
base_query = f"select * from models.{table_name}"
all_conf_ids = range(1,10)
n_jobs = 4
with Pool(n_jobs) as p:
p.map(partial(dump_conf_id, base_query = base_query), all_conf_ids)
with open("/tmp/final_output.json", "wb") as f:
filenames = [f'/tmp/output_file_{i}.json' for i in all_conf_ids]
此任务在 airflow v1.10 中对我们来说工作正常,但在 v2.1+ 中不再工作。此处的第 2.1 节 - https://blog.mbedded.ninja/programming/languages/python/python-multiprocessing/ - 提到 “如果您尝试从已经使用池创建的 child 工作人员中创建池,您将 运行 进入错误:守护进程不允许有 children"
这是完整的 Airflow 错误:
[2021-08-22 02:11:53,064] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/plugins/tasks/bigquery.py", line 249, in dump_in_parallel
with Pool(n_jobs) as p:
File "/usr/local/lib/python3.7/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 176, in __init__
self._repopulate_pool()
File "/usr/local/lib/python3.7/multiprocessing/pool.py", line 241, in _repopulate_pool
w.start()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 110, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
如果重要,我们 运行 使用 LocalExecutor 气流。知道为什么这个使用 Pool 的任务会在 airflow v1.10 中工作但不再在 airflow 2.1 中工作吗?
根据 https://github.com/celery/celery/issues/4525,将 multiprocessing
库替换为 billiard
库有效。我们不知道为什么将一个库替换为另一个库可以解决此问题...
Airflow 2 在后台使用不同的处理模型来加速处理,同时在 运行ning 任务之间保持基于进程的隔离。
这就是为什么它在 运行 任务的钩子下使用 forking
和多处理,但这也意味着如果您正在使用多处理,您将达到 Python 多处理的限制不允许链接多处理。
我不是 100% 确定它是否有效,但您可以尝试将 execute_tasks_new_python_interpreter
配置设置为 True。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#execute-tasks-new-python-interpreter。此设置将导致 airflow 在 运行ning 任务而不是 forking/using 多处理时启动新的 Python 解释器(尽管我不是 100% 确定后者)。尽管 运行 你的任务会慢很多(最多几秒钟的开销),因为新的 Python 解释器必须在 运行 之前重新初始化并导入所有气流代码完成你的任务。
如果这不起作用,那么您可以使用 PythonVirtualenvOperator 午餐您的多处理作业 - 它将启动一个新的 Python 解释器到 运行 您的 python代码,您应该能够使用多处理。
您可以切换到带有锁定后端的 joblib python 库,并在执行后使用以下 instructions.
终止守护进程