获取 airflow.models.dag get_last_dagrun() 的会话参数

Get session parameter for airflow.models.dag get_last_dagrun()

我正在尝试向我的自定义运算符传递一个参数,该参数是 dag 本身的最后 运行 时间。

按照文档,我明白我应该使用 dag.get_last_dagrun() https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/models/dag/index.html#airflow.models.dag.get_last_dagrun 。但是,我无法正确传递会话参数。
我在哪里可以找到这个?

使用不带参数的函数时,returnNone。 我认为这是因为我自己触发了 Dag,因此我想将 include_externally_triggered 设置为 true。但是之前还是需要管理session参数的。

我尝试在创建 dag 之前以及定义任务时创建变量 last_run。我想在任务内部,包含了 self 并且它会在不放置任何参数的情况下正确获取。 但是在 dag 之外的那个呢?

我也试过这个解决方案,它给我一个时间,即使它是我第一次 运行 Dag(我已经从 ui 清理了 dag 日志),也许它是当前的执行 DAG 时间戳?如果是,我需要比较日期是否相等才能豁免?

from airflow import DAG
from DAG.operators.custom_operator1 import customOperator1

last_run = dag.get_last_dagrun() #HERE

default_args = {
    "owner": "admin",
    "depends_on_past": False,
    "email": ["email@email.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
}

with DAG(
    dag_id="Custom",
    schedule_interval="@once",
    description="Desc",
    start_date=datetime(2022, 3, 11),
    catchup=False,
    tags=["custom"],
    default_args=default_args) as dag:

    #Custom Operator
    custom = customOperator1(
      task_id = 'custom',
      last_run = dag.get_last_dagrun() # OR HERE
    )

custom

的实际答案:。包括当前 运行 Dag。因此,我稍微修改了函数以豁免其他状态不是'running'的dags当然,您可以为其他Dag状态添加其他条件: enter image description here

现在,我可以获得最新的成功 Dag execution_date 来动态更新我的数据!

from airflow.models import DagRun

def get_last_exec_date(dag_id):
    dag_runs = DagRun.find(dag_id=dag_id)
    dags = []
    for dag in dag_runs:
        if dag.state == 'success':
            dags.append(dag)
    
    dags.sort(key=lambda x: x.execution_date, reverse=False)

    return dags[0] if dags != [] else None