如何在 Big Query 表中显示 Airflow DAG 状态
How to display Airflow DAG status in Big Query tables
我想向 BQ 中的 table 显示 DAG(气流)最终状态(success/Failure)。
这样 table 可以包含:日期时间、DAG 名称、状态等列,它将根据 DAG 的最终状态进行填充。
请帮忙;如何实现?
Airflow 中没有原生的 out-of-the-box 方法来实现这一点。但是,您可以自己实现一个函数,通过 DAG 的 on_success_callback
和 on_failure_callback
方法将数据写入 BigQuery 和 运行。
注意:BigQuery 不是事务型数据库,并且对每天的插入次数有限制。对于大量 DAG 运行,您可能会考虑将结果批量写入 BigQuery。
如果您需要 real-time 中的数据,我会按照@Bas 建议的方法进行一些操作,可能使用 firestore 或 Cloud SQL。但是,如果您使用 BigQuery,请注意他对每天插入的评论。
如果您可以每天等待结果,您可以按照此处所述将日志汇入 BigQuery:
https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports
在过滤条件中,您可以引入所有 Airflow 日志,也可以只引入 worker/scheduler.
中的日志
Ex 条件:
resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"
在日志 textPayload 中,您会看到如下内容:
Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859
然后您可以在 BigQuery 中解析您需要的内容
补充用户Bas Harenslak
的回答。您还可以探索这些选项:
- 你可以利用TriggerDagRunOperator。通过使用它,您可以拥有一个 dag(一个
recap-dag
),您的 DAG 将引用它来将记录填充到目标数据集中。
trigger_recap_dag = TriggerDagRunOperator(
task_id="trigger_recap_dag",
trigger_dag_id="recap-dag",
wait_for_completion=False,
allowed_states=['success']
conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
)
ingestion >> transformation >> save >> send_notification >> trigger_recap_dag
- 如果您认为合适,此
recap-dag
也可以是独立的,并且仅 运行 每 hour/day/week 您的选举并检查您的 DAG 状态。
with DAG(
'recap-dag',
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
...
# Airflow >= 2.0.0
# Inside a python Operator
def GetRunningDagsInfo():
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
...
- 您可以利用先前的选项并提供如下解决方案:
After you dag (or dags) complete, it will fire the trigger dag. this recap-dag
will saves your dag records into a custom table or
file and then your independent DAG runs and retrieves the datasets
that have been created so far and push the data into your BigQuery
Table.
- 另一种选择是查看您的 Airflow 数据库以检索 运行ning 信息。已知为 Data Profiling。出于安全考虑,它已在最新版本中弃用。
我想向 BQ 中的 table 显示 DAG(气流)最终状态(success/Failure)。 这样 table 可以包含:日期时间、DAG 名称、状态等列,它将根据 DAG 的最终状态进行填充。
请帮忙;如何实现?
Airflow 中没有原生的 out-of-the-box 方法来实现这一点。但是,您可以自己实现一个函数,通过 DAG 的 on_success_callback
和 on_failure_callback
方法将数据写入 BigQuery 和 运行。
注意:BigQuery 不是事务型数据库,并且对每天的插入次数有限制。对于大量 DAG 运行,您可能会考虑将结果批量写入 BigQuery。
如果您需要 real-time 中的数据,我会按照@Bas 建议的方法进行一些操作,可能使用 firestore 或 Cloud SQL。但是,如果您使用 BigQuery,请注意他对每天插入的评论。
如果您可以每天等待结果,您可以按照此处所述将日志汇入 BigQuery: https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports
在过滤条件中,您可以引入所有 Airflow 日志,也可以只引入 worker/scheduler.
中的日志Ex 条件:
resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"
在日志 textPayload 中,您会看到如下内容:
Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859
然后您可以在 BigQuery 中解析您需要的内容
补充用户Bas Harenslak
的回答。您还可以探索这些选项:
- 你可以利用TriggerDagRunOperator。通过使用它,您可以拥有一个 dag(一个
recap-dag
),您的 DAG 将引用它来将记录填充到目标数据集中。
trigger_recap_dag = TriggerDagRunOperator(
task_id="trigger_recap_dag",
trigger_dag_id="recap-dag",
wait_for_completion=False,
allowed_states=['success']
conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
)
ingestion >> transformation >> save >> send_notification >> trigger_recap_dag
- 如果您认为合适,此
recap-dag
也可以是独立的,并且仅 运行 每 hour/day/week 您的选举并检查您的 DAG 状态。
with DAG(
'recap-dag',
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
...
# Airflow >= 2.0.0
# Inside a python Operator
def GetRunningDagsInfo():
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
...
- 您可以利用先前的选项并提供如下解决方案:
After you dag (or dags) complete, it will fire the trigger dag. this
recap-dag
will saves your dag records into a custom table or file and then your independent DAG runs and retrieves the datasets that have been created so far and push the data into your BigQuery Table.
- 另一种选择是查看您的 Airflow 数据库以检索 运行ning 信息。已知为 Data Profiling。出于安全考虑,它已在最新版本中弃用。