如何获取手动触发 DAG 的 Airflow 用户?
How to get Airflow user who manually trigger a DAG?
在 Airflow UI 中,"Browser > Logs" 下可用的日志事件之一是事件 "Trigger" 以及 DAG ID 和 Owner/User 谁负责触发此事件事件。这些信息是否可以通过编程方式轻松获得?
用例是,我有一个允许一部分用户手动触发执行的 DAG。根据触发此 DAG 执行的用户,此 DAG 的代码执行行为将有所不同。
提前致谢。
您可以直接从 Airflow 元数据数据库中的 Log
table 中获取它,如下所示:
from airflow.models.log import Log
from airflow.utils.db import create_session
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == 'example_trigger_target_dag', Log.event == 'trigger').all()
# Get top 2 records
results[2]
输出:
(datetime.datetime(2020, 3, 30, 23, 16, 52, 487095, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'example_trigger_target_dag',
None,
'admin',
'[(\'dag_id\', \'example_trigger_target_dag\'), (\'origin\', \'/tree?dag_id=example_trigger_target_dag\'), (\'csrf_token\', \'IjhmYzQ4MGU2NGFjMzg2ZWI3ZjgyMTA1MWM3N2RhYmZiOThkOTFhMTYi.XoJ92A.5q35ClFnQjKRiWwata8dNlVs-98\'), (\'conf\', \'{"message": "kaxil"}\')]')
我稍微修改一下之前的回答:
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date,
Log.owner, Log.extra)\
.filter(Log.dag_id == 'dag_id', Log.event ==
'trigger').order_by(Log.dttm.desc()).all()
在 Airflow UI 中,"Browser > Logs" 下可用的日志事件之一是事件 "Trigger" 以及 DAG ID 和 Owner/User 谁负责触发此事件事件。这些信息是否可以通过编程方式轻松获得?
用例是,我有一个允许一部分用户手动触发执行的 DAG。根据触发此 DAG 执行的用户,此 DAG 的代码执行行为将有所不同。
提前致谢。
您可以直接从 Airflow 元数据数据库中的 Log
table 中获取它,如下所示:
from airflow.models.log import Log
from airflow.utils.db import create_session
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == 'example_trigger_target_dag', Log.event == 'trigger').all()
# Get top 2 records
results[2]
输出:
(datetime.datetime(2020, 3, 30, 23, 16, 52, 487095, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
'example_trigger_target_dag',
None,
'admin',
'[(\'dag_id\', \'example_trigger_target_dag\'), (\'origin\', \'/tree?dag_id=example_trigger_target_dag\'), (\'csrf_token\', \'IjhmYzQ4MGU2NGFjMzg2ZWI3ZjgyMTA1MWM3N2RhYmZiOThkOTFhMTYi.XoJ92A.5q35ClFnQjKRiWwata8dNlVs-98\'), (\'conf\', \'{"message": "kaxil"}\')]')
我稍微修改一下之前的回答:
with create_session() as session:
results = session.query(Log.dttm, Log.dag_id, Log.execution_date,
Log.owner, Log.extra)\
.filter(Log.dag_id == 'dag_id', Log.event ==
'trigger').order_by(Log.dttm.desc()).all()