Airflow:发布一个动态创建的 dag
Airflow : Publish a dynamically created dag
我希望能够从不受调度程序控制的代码中发布和触发 DAG 对象(即 $AIRFLOW_HOME/dags 文件夹)
我最后的办法是以编程方式创建一个包含我要发布的 DAG 定义的 py 文件,并将该文件保存到 $AIRFLOW_HOME/dags 文件夹中。
我相信它应该比那更容易。
下面是我试过的。
import airflow
from airflow import DAG
from datetime import timedelta
from airflow.models import DagPickle
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.db import provide_session
@provide_session
def submit_dag(session=None):
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='sample', default_args=args,
schedule_interval=None, start_date=airflow.utils.dates.days_ago(2),
dagrun_timeout=timedelta(minutes=60))
task = DummyOperator(task_id='one', dag=dag)
dag_pickle = DagPickle(task)
session.add(dag_pickle)
session.commit()
submit_dag()
上面的代码确实在 dag_pickle table 中创建了条目,但是我如何发布并稍后触发这个 dag?
我可以 pickle.dump(dag,open(DAGS_FOLDER/pickled_dags,'wb')) 并且在 DAGS 文件夹中有一个文件 pickle.load(DAGS_FOLDER/pickled_dags)
我希望能够从不受调度程序控制的代码中发布和触发 DAG 对象(即 $AIRFLOW_HOME/dags 文件夹)
我最后的办法是以编程方式创建一个包含我要发布的 DAG 定义的 py 文件,并将该文件保存到 $AIRFLOW_HOME/dags 文件夹中。 我相信它应该比那更容易。
下面是我试过的。
import airflow
from airflow import DAG
from datetime import timedelta
from airflow.models import DagPickle
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.db import provide_session
@provide_session
def submit_dag(session=None):
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='sample', default_args=args,
schedule_interval=None, start_date=airflow.utils.dates.days_ago(2),
dagrun_timeout=timedelta(minutes=60))
task = DummyOperator(task_id='one', dag=dag)
dag_pickle = DagPickle(task)
session.add(dag_pickle)
session.commit()
submit_dag()
上面的代码确实在 dag_pickle table 中创建了条目,但是我如何发布并稍后触发这个 dag?
我可以 pickle.dump(dag,open(DAGS_FOLDER/pickled_dags,'wb')) 并且在 DAGS 文件夹中有一个文件 pickle.load(DAGS_FOLDER/pickled_dags)