Airflow中如何获取DAG链执行时间?
How to get the DAG chain execution time in Airflow?
假设我有两个 DAG,其中 dag2 使用 TriggerDagRunOperator 作为其流程的一部分执行 dag1,如下所示:
- dag1:任务 1 > 任务 2 > 任务 3
- dag2:任务 4 > dag1 > 任务 5
现在假设 dag2 安排在每天下午 5 点举行一次。
当我是 运行 dag1 时,有没有办法让我获取 dag2(父 DAG)的执行时间戳?
是否有任何内置参数保存该值?
如果发生了什么事情并且 dag2 比平时更晚触发,比如说当天下午 6 点,那么我仍然想获得原始调度时间 - 即我在 dag1 时的下午 5 点。
将函数传递给 TriggerDagRunOperator
的 python_callable
参数,将 execution_date
注入触发的 DAG:
def inject_execution_date(context, dag_run_obj):
dag_run_obj.payload = {"parent_execution_date": context["execution_date"]}
return dag_run_obj
[...]
trigger_dro = TriggerDagRunOperator(python_callable=inject_execution_date, [...])
您可以使用 context["conf"]["parent_execution_date"]
在子 DAG 中访问它
假设我有两个 DAG,其中 dag2 使用 TriggerDagRunOperator 作为其流程的一部分执行 dag1,如下所示:
- dag1:任务 1 > 任务 2 > 任务 3
- dag2:任务 4 > dag1 > 任务 5
现在假设 dag2 安排在每天下午 5 点举行一次。 当我是 运行 dag1 时,有没有办法让我获取 dag2(父 DAG)的执行时间戳? 是否有任何内置参数保存该值?
如果发生了什么事情并且 dag2 比平时更晚触发,比如说当天下午 6 点,那么我仍然想获得原始调度时间 - 即我在 dag1 时的下午 5 点。
将函数传递给 TriggerDagRunOperator
的 python_callable
参数,将 execution_date
注入触发的 DAG:
def inject_execution_date(context, dag_run_obj):
dag_run_obj.payload = {"parent_execution_date": context["execution_date"]}
return dag_run_obj
[...]
trigger_dro = TriggerDagRunOperator(python_callable=inject_execution_date, [...])
您可以使用 context["conf"]["parent_execution_date"]