Airflow:发布一个动态创建的 dag

Airflow : Publish a dynamically created dag

我希望能够从不受调度程序控制的代码中发布和触发 DAG 对象(即 $AIRFLOW_HOME/dags 文件夹)

我最后的办法是以编程方式创建一个包含我要发布的 DAG 定义的 py 文件,并将该文件保存到 $AIRFLOW_HOME/dags 文件夹中。 我相信它应该比那更容易。

下面是我试过的。

import airflow
from airflow import DAG
from datetime import timedelta

from airflow.models import DagPickle
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.db import provide_session


@provide_session
def submit_dag(session=None):
    args = {
        'owner': 'airflow',
        'start_date': airflow.utils.dates.days_ago(2)
    }

    dag = DAG(
        dag_id='sample', default_args=args,
        schedule_interval=None, start_date=airflow.utils.dates.days_ago(2),
        dagrun_timeout=timedelta(minutes=60))
    task = DummyOperator(task_id='one', dag=dag)
    dag_pickle = DagPickle(task)
    session.add(dag_pickle)
    session.commit()


submit_dag()

上面的代码确实在 dag_pickle table 中创建了条目,但是我如何发布并稍后触发这个 dag?

我可以 pickle.dump(dag,open(DAGS_FOLDER/pickled_dags,'wb')) 并且在 DAGS 文件夹中有一个文件 pickle.load(DAGS_FOLDER/pickled_dags)