为什么 2 个并行任务具有相同的 pid?

Why 2 parallel tasks have same pid?

我刚刚将 airflow.cfg 配置为与 LocalExecutor 一起工作:

    executor = LocalExecutor
    sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow
    sql_engine_encoding = utf-8

初始化数据库后,我运行以下DAG:

with DAG(dag_id='a_example_parallel_v', schedule_interval=None, start_date=days_ago(2),) as dag:

    def task1_func(ti):
        print(f"pid: ({[os.getgid()]}task1: print from task 1")

    def task2_func(ti):
        print(f"pid: ({[os.getgid()]}task1: print from task 2")

    def task3_func(ti):
        print(f"pid: ({[os.getgid()]}task1: print from task 3")

    def task4_func(ti):
        print(f"pid: ({[os.getgid()]}task1: print from task 4")


    task1 = PythonOperator(task_id='task1', python_callable=task1_func, provide_context=True)
    task2 = PythonOperator(task_id='task2', python_callable=task2_func, provide_context=True)
    task3 = PythonOperator(task_id='task3', python_callable=task3_func, provide_context=True)
    task4 = PythonOperator(task_id='task4', python_callable=task4_func, provide_context=True)

task1 >> task2
task1 >> task3
task2 >> task4
task3 >> task4

如何配置气流,以便任务 2 和任务 3 运行 并行?

Airflow 在 airflow.cfg 中有设置:

execute_tasks_new_python_interpreter - 任务是否应该通过父进程的分支(“False”,更快的选项)或通过产生一个新的 python 进程(“True”慢,但意味着插件任务立即拾取更改)

可以在添加该功能的 PR 中找到原因: 生成一个全新的 python 进程,然后 re-loading 所有 Airflow 太贵了。尽管这一切时间都变得微不足道了很久 运行 个任务,这种延迟给新用户带来了“糟糕”的体验 他们只是第一次尝试 Airflow。