如何获取手动触发 DAG 的 Airflow 用户?

How to get Airflow user who manually trigger a DAG?

在 Airflow UI 中,"Browser > Logs" 下可用的日志事件之一是事件 "Trigger" 以及 DAG ID 和 Owner/User 谁负责触发此事件事件。这些信息是否可以通过编程方式轻松获得?

用例是,我有一个允许一部分用户手动触发执行的 DAG。根据触发此 DAG 执行的用户,此 DAG 的代码执行行为将有所不同。

提前致谢。

您可以直接从 Airflow 元数据数据库中的 Log table 中获取它,如下所示:

from airflow.models.log import Log
from airflow.utils.db import create_session

with create_session() as session:
   results = session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner, Log.extra).filter(Log.dag_id == 'example_trigger_target_dag', Log.event == 'trigger').all()

# Get top 2 records
results[2]

输出:

(datetime.datetime(2020, 3, 30, 23, 16, 52, 487095, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>),
 'example_trigger_target_dag',
 None,
 'admin',
 '[(\'dag_id\', \'example_trigger_target_dag\'), (\'origin\', \'/tree?dag_id=example_trigger_target_dag\'), (\'csrf_token\', \'IjhmYzQ4MGU2NGFjMzg2ZWI3ZjgyMTA1MWM3N2RhYmZiOThkOTFhMTYi.XoJ92A.5q35ClFnQjKRiWwata8dNlVs-98\'), (\'conf\', \'{"message": "kaxil"}\')]')

我稍微修改一下之前的回答:

    with create_session() as session:
       results = session.query(Log.dttm, Log.dag_id, Log.execution_date, 
       Log.owner, Log.extra)\
       .filter(Log.dag_id == 'dag_id', Log.event == 
       'trigger').order_by(Log.dttm.desc()).all()