如何在 Big Query 表中显示 Airflow DAG 状态

How to display Airflow DAG status in Big Query tables

我想向 BQ 中的 table 显示 DAG(气流)最终状态(success/Failure)。 这样 table 可以包含:日期时间、DAG 名称、状态等列,它将根据 DAG 的最终状态进行填充。

请帮忙;如何实现?

Airflow 中没有原生的 out-of-the-box 方法来实现这一点。但是,您可以自己实现一个函数,通过 DAG 的 on_success_callbackon_failure_callback 方法将数据写入 BigQuery 和 运行。

注意:BigQuery 不是事务型数据库,并且对每天的插入次数有限制。对于大量 DAG 运行,您可能会考虑将结果批量写入 BigQuery。

如果您需要 real-time 中的数据,我会按照@Bas 建议的方法进行一些操作,可能使用 firestore 或 Cloud SQL。但是,如果您使用 BigQuery,请注意他对每天插入的评论。

如果您可以每天等待结果,您可以按照此处所述将日志汇入 BigQuery: https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports

在过滤条件中,您可以引入所有 Airflow 日志,也可以只引入 worker/scheduler.

中的日志

Ex 条件:

resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"

在日志 textPayload 中,您会看到如下内容:

Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859

然后您可以在 BigQuery 中解析您需要的内容

补充用户Bas Harenslak的回答。您还可以探索这些选项:

  • 你可以利用TriggerDagRunOperator。通过使用它,您可以拥有一个 dag(一个 recap-dag),您的 DAG 将引用它来将记录填充到目标数据集中。
trigger_recap_dag = TriggerDagRunOperator(
      task_id="trigger_recap_dag",
      trigger_dag_id="recap-dag",
      wait_for_completion=False,
      allowed_states=['success']
      conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
  )

ingestion >> transformation >> save >> send_notification >> trigger_recap_dag 
  • 如果您认为合适,此 recap-dag 也可以是独立的,并且仅 运行 每 hour/day/week 您的选举并检查您的 DAG 状态。
with DAG(
  'recap-dag',
  schedule_interval='@daily',
  start_date=datetime(2021, 1, 1),
  catchup=False, 
) as dag:

  ...
  # Airflow >= 2.0.0
  # Inside a python Operator
  def GetRunningDagsInfo():
     dag_runs = DagRun.find(
       dag_id=your_dag_id,
       execution_start_date=your_start_date
       execution_end_date=your_end_date
     )
    
  ...
  • 您可以利用先前的选项并提供如下解决方案:

After you dag (or dags) complete, it will fire the trigger dag. this recap-dag will saves your dag records into a custom table or file and then your independent DAG runs and retrieves the datasets that have been created so far and push the data into your BigQuery Table.

  • 另一种选择是查看您的 Airflow 数据库以检索 运行ning 信息。已知为 Data Profiling。出于安全考虑,它已在最新版本中弃用。