如何将 PostgresOperator DAG 标记为条件失败?

How to mark PostgresOperator DAG as failed on condition?

我每天在特定时间有一个 Airflow DAG 运行。它是一个 PG 运算符并获取一个 table。像下面这样:

select count(*) from my_table where date_insert='{{ds}}'

现在我需要做的是,如果此计数为零,则将此任务标记为失败。所以我猜我需要 return 从 DAG 到 Python 的数据。

这是与 PostgreSQL 兼容的 SQLCheckOperator 用例:

from airflow.operators.sql import SQLCheckOperator
SQLCheckOperator(
    conn_id='your_conn',
    sql="""select count(*) from my_table where date_insert='{{ds}}'""",
    
)

如果count==0.

,运算符将引发AirflowException