为什么 2 个并行任务具有相同的 pid?
Why 2 parallel tasks have same pid?
我刚刚将 airflow.cfg
配置为与 LocalExecutor
一起工作:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
sql_engine_encoding = utf-8
初始化数据库后,我运行以下DAG:
with DAG(dag_id='a_example_parallel_v', schedule_interval=None, start_date=days_ago(2),) as dag:
def task1_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 1")
def task2_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 2")
def task3_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 3")
def task4_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 4")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task3 = PythonOperator(task_id='task3', python_callable=task3_func, provide_context=True)
task4 = PythonOperator(task_id='task4', python_callable=task4_func, provide_context=True)
task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4
我检查了任务 2 和任务 3 的日志,它们打印了相同的 PID。
如果任务 2 和任务 3 的 PID 相同,则表示它们没有运行并行。
如何配置气流,以便任务 2 和任务 3 运行 并行?
Airflow 在 airflow.cfg
中有设置:
execute_tasks_new_python_interpreter
- 任务是否应该通过父进程的分支(“False”,更快的选项)或通过产生一个新的 python 进程(“True”慢,但意味着插件任务立即拾取更改)
可以在添加该功能的 PR 中找到原因:
生成一个全新的 python 进程,然后 re-loading 所有 Airflow
太贵了。尽管这一切时间都变得微不足道了很久
运行 个任务,这种延迟给新用户带来了“糟糕”的体验
他们只是第一次尝试 Airflow。
我刚刚将 airflow.cfg
配置为与 LocalExecutor
一起工作:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
sql_engine_encoding = utf-8
初始化数据库后,我运行以下DAG:
with DAG(dag_id='a_example_parallel_v', schedule_interval=None, start_date=days_ago(2),) as dag:
def task1_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 1")
def task2_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 2")
def task3_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 3")
def task4_func(ti):
print(f"pid: ({[os.getgid()]}task1: print from task 4")
task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
task3 = PythonOperator(task_id='task3', python_callable=task3_func, provide_context=True)
task4 = PythonOperator(task_id='task4', python_callable=task4_func, provide_context=True)
task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4
我检查了任务 2 和任务 3 的日志,它们打印了相同的 PID。
如果任务 2 和任务 3 的 PID 相同,则表示它们没有运行并行。
如何配置气流,以便任务 2 和任务 3 运行 并行?
Airflow 在 airflow.cfg
中有设置:
execute_tasks_new_python_interpreter
- 任务是否应该通过父进程的分支(“False”,更快的选项)或通过产生一个新的 python 进程(“True”慢,但意味着插件任务立即拾取更改)
可以在添加该功能的 PR 中找到原因: 生成一个全新的 python 进程,然后 re-loading 所有 Airflow 太贵了。尽管这一切时间都变得微不足道了很久 运行 个任务,这种延迟给新用户带来了“糟糕”的体验 他们只是第一次尝试 Airflow。