气流以编程方式取消暂停 dag?

Airflow unpause dag programmatically?

我有一个 dag,我们将部署到多个不同的气流实例,在我们的 airflow.cfg 中我们有 dags_are_paused_at_creation = True 但对于这个特定的 dag,我们希望它无需执行即可打开所以手动点击 UI。有没有办法以编程方式做到这一点?

如果其他人遇到此问题,我创建了以下函数:

import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
    """
    A way to programatically unpause a DAG.
    :param dag: DAG object
    :return: dag.is_paused is now False
    """
    session = airflow.settings.Session()
    try:
        qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
        d = qry.first()
        d.is_paused = False
        session.commit()
    except:
        session.rollback()
    finally:
        session.close()

在命令行中提供 dag_id 和 运行 此命令。

airflow pause dag_id.

有关 airflow 命令行界面的更多信息:https://airflow.incubator.apache.org/cli.html

airflow-rest-api-plugin 插件也可用于以编程方式暂停任务。

Pauses a DAG

Available in Airflow Version: 1.7.0 or greater

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

Query Arguments:

dag_id - string - The id of the dag

subdir (optional) - string - File location or directory from which to look for the dag

Examples:

http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

查看更多详情: https://github.com/teamclairvoyant/airflow-rest-api-plugin

我认为您正在寻找 unpause(不是 pause

airflow unpause DAG_ID

根据最近的文档,以下 cli 命令应该可以工作。

airflow dags unpause dag_id

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause

Airflow 的 REST API 提供了一种使用 DAG 补丁的方法 API:我们需要使用查询参数 ?update_mask=is_paused 更新 dag 并将布尔值作为请求正文发送。

参考:https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag