获取 airflow.models.dag get_last_dagrun() 的会话参数
Get session parameter for airflow.models.dag get_last_dagrun()
我正在尝试向我的自定义运算符传递一个参数,该参数是 dag 本身的最后 运行 时间。
按照文档,我明白我应该使用 dag.get_last_dagrun()
https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/models/dag/index.html#airflow.models.dag.get_last_dagrun 。但是,我无法正确传递会话参数。
我在哪里可以找到这个?
使用不带参数的函数时,returnNone
。
我认为这是因为我自己触发了 Dag,因此我想将 include_externally_triggered
设置为 true。但是之前还是需要管理session参数的。
我尝试在创建 dag 之前以及定义任务时创建变量 last_run
。我想在任务内部,包含了 self 并且它会在不放置任何参数的情况下正确获取。
但是在 dag 之外的那个呢?
我也试过这个解决方案,它给我一个时间,即使它是我第一次 运行 Dag(我已经从 ui 清理了 dag 日志),也许它是当前的执行 DAG 时间戳?如果是,我需要比较日期是否相等才能豁免?
from airflow import DAG
from DAG.operators.custom_operator1 import customOperator1
last_run = dag.get_last_dagrun() #HERE
default_args = {
"owner": "admin",
"depends_on_past": False,
"email": ["email@email.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
}
with DAG(
dag_id="Custom",
schedule_interval="@once",
description="Desc",
start_date=datetime(2022, 3, 11),
catchup=False,
tags=["custom"],
default_args=default_args) as dag:
#Custom Operator
custom = customOperator1(
task_id = 'custom',
last_run = dag.get_last_dagrun() # OR HERE
)
custom
的实际答案:。包括当前 运行 Dag。因此,我稍微修改了函数以豁免其他状态不是'running'的dags当然,您可以为其他Dag状态添加其他条件:
enter image description here
现在,我可以获得最新的成功 Dag execution_date 来动态更新我的数据!
from airflow.models import DagRun
def get_last_exec_date(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dags = []
for dag in dag_runs:
if dag.state == 'success':
dags.append(dag)
dags.sort(key=lambda x: x.execution_date, reverse=False)
return dags[0] if dags != [] else None
我正在尝试向我的自定义运算符传递一个参数,该参数是 dag 本身的最后 运行 时间。
按照文档,我明白我应该使用 dag.get_last_dagrun()
https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/models/dag/index.html#airflow.models.dag.get_last_dagrun 。但是,我无法正确传递会话参数。
我在哪里可以找到这个?
使用不带参数的函数时,returnNone
。
我认为这是因为我自己触发了 Dag,因此我想将 include_externally_triggered
设置为 true。但是之前还是需要管理session参数的。
我尝试在创建 dag 之前以及定义任务时创建变量 last_run
。我想在任务内部,包含了 self 并且它会在不放置任何参数的情况下正确获取。
但是在 dag 之外的那个呢?
我也试过这个解决方案,它给我一个时间,即使它是我第一次 运行 Dag(我已经从 ui 清理了 dag 日志),也许它是当前的执行 DAG 时间戳?如果是,我需要比较日期是否相等才能豁免?
from airflow import DAG
from DAG.operators.custom_operator1 import customOperator1
last_run = dag.get_last_dagrun() #HERE
default_args = {
"owner": "admin",
"depends_on_past": False,
"email": ["email@email.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
}
with DAG(
dag_id="Custom",
schedule_interval="@once",
description="Desc",
start_date=datetime(2022, 3, 11),
catchup=False,
tags=["custom"],
default_args=default_args) as dag:
#Custom Operator
custom = customOperator1(
task_id = 'custom',
last_run = dag.get_last_dagrun() # OR HERE
)
custom
的实际答案:。包括当前 运行 Dag。因此,我稍微修改了函数以豁免其他状态不是'running'的dags当然,您可以为其他Dag状态添加其他条件: enter image description here
现在,我可以获得最新的成功 Dag execution_date 来动态更新我的数据!
from airflow.models import DagRun
def get_last_exec_date(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dags = []
for dag in dag_runs:
if dag.state == 'success':
dags.append(dag)
dags.sort(key=lambda x: x.execution_date, reverse=False)
return dags[0] if dags != [] else None