Airflow 任务仅在第一个可用 运行
Airflow tasks only available on the first run
我的用例很简单:我有一些进程与数据库中的一些表进行交互。我想自动创建表,所以我添加了 create_table_if_not_exists_task
。我只想 运行 在第一个 DAG 运行 中执行该任务,但不在后续的 DAG 中执行,因为它占用了我本可以在其他地方使用的 DAG 时间/资源。
我的问题是:我在 Airflow 中有一个干净的方法来做到这一点吗?
我的想法是使用该信息更新 Airflow 变量并在 DAG 解析中检查它。不喜欢它,因为它会在每次心跳时创建到元数据数据库的连接。
您可以在要控制的任务前使用 ShortCircuitOperator
(甚至 BranchPythonOperator
,具体取决于您的管道),访问 dag_run
对象(直接或通过 context
) 在 Python 可调用文件中,然后使用 DagRun.get_previous_dagrun()
方法检查任何先前的 DagRun。像这样的东西(虽然未经测试):
def has_previous_dagrun(dag_run):
return False if dag_run.get_previous_dagrun() is not None else True
...
check_if_first_dagrun = ShortCircuitOperator(
task_id="check_if_first_dagrun",
python_callable=has_previous_dagrun,
)
我的用例很简单:我有一些进程与数据库中的一些表进行交互。我想自动创建表,所以我添加了 create_table_if_not_exists_task
。我只想 运行 在第一个 DAG 运行 中执行该任务,但不在后续的 DAG 中执行,因为它占用了我本可以在其他地方使用的 DAG 时间/资源。
我的问题是:我在 Airflow 中有一个干净的方法来做到这一点吗?
我的想法是使用该信息更新 Airflow 变量并在 DAG 解析中检查它。不喜欢它,因为它会在每次心跳时创建到元数据数据库的连接。
您可以在要控制的任务前使用 ShortCircuitOperator
(甚至 BranchPythonOperator
,具体取决于您的管道),访问 dag_run
对象(直接或通过 context
) 在 Python 可调用文件中,然后使用 DagRun.get_previous_dagrun()
方法检查任何先前的 DagRun。像这样的东西(虽然未经测试):
def has_previous_dagrun(dag_run):
return False if dag_run.get_previous_dagrun() is not None else True
...
check_if_first_dagrun = ShortCircuitOperator(
task_id="check_if_first_dagrun",
python_callable=has_previous_dagrun,
)