如何将 PostgresOperator DAG 标记为条件失败?
How to mark PostgresOperator DAG as failed on condition?
我每天在特定时间有一个 Airflow DAG 运行。它是一个 PG 运算符并获取一个 table。像下面这样:
select count(*) from my_table where date_insert='{{ds}}'
现在我需要做的是,如果此计数为零,则将此任务标记为失败。所以我猜我需要 return 从 DAG 到 Python 的数据。
这是与 PostgreSQL 兼容的 SQLCheckOperator 用例:
from airflow.operators.sql import SQLCheckOperator
SQLCheckOperator(
conn_id='your_conn',
sql="""select count(*) from my_table where date_insert='{{ds}}'""",
)
如果count==0
.
,运算符将引发AirflowException
我每天在特定时间有一个 Airflow DAG 运行。它是一个 PG 运算符并获取一个 table。像下面这样:
select count(*) from my_table where date_insert='{{ds}}'
现在我需要做的是,如果此计数为零,则将此任务标记为失败。所以我猜我需要 return 从 DAG 到 Python 的数据。
这是与 PostgreSQL 兼容的 SQLCheckOperator 用例:
from airflow.operators.sql import SQLCheckOperator
SQLCheckOperator(
conn_id='your_conn',
sql="""select count(*) from my_table where date_insert='{{ds}}'""",
)
如果count==0
.
AirflowException