尝试在 Airflow DAG 中引发异常并获得 "Broken DAG"

Trying to raise exception in Airflow DAG and getting "Broken DAG"

如果 cursor.rowcount > 0 为真,我希望我的 DAG 失败(而 运行)。 这是代码:

def execute(self):
        self.hook = PostgresHook(postgres_conn_id=self.dest_redshift_conn_id)
        conn = self.hook.get_conn()
        cursor = conn.cursor()
        log.info("Connected with " + self.dest_redshift_conn_id)
        log.info('Starting Query')
        print('Starting Query \n')
        cursor.execute("begin transaction;")
        sql_comm = self.read_queries_file()
        for command in sql_comm:
            sql_statement = command
            if sql_statement:
                log.info(sql_statement)
                log.info('\n')
                cursor.execute(sql_statement)
                res = cursor.fetchmany(10)
                if cursor.rowcount > 0:
                    raise Exception(res)
                else:
                    log.info('\nNo result found in the query')
        cursor.execute(" end transaction;")
        cursor.close()
        log.info("Select process is completed".format(self.dest_table))

我已将它推送到 Airflow,但在没有 运行 DAG 的情况下重新启动 Airflow 服务后出现“Broken DAG”错误。 Error print screen

我怎样才能使我的代码中的 DAG 失败并且 Airflow 不会中断? 已经尝试更改异常类型但没有帮助。

执行方法应该接收上下文参数。尝试使用 firm as

方法重写您的代码
def execute(self, context):
  ...

我终于将 def execute(self, context) 更改为 def execute(self, **context),现在可以使用了