Airflow - 记录触发 dag 的用户

Airflow - log the user who triggered the dag

我尝试在终止 postgres 挂起查询的 Airflow 中记录触发我的 DAG 的用户,但它不起作用。你能帮忙看看哪里出了问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是 'None'。

utils.py(描述会话逻辑的地方)

import logging
from airflow.models.log import Log
from airflow.utils.db import create_session
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook


# To test this use this command:
# airflow tasks test killer_dag killer_query {date} -t '{"pid":"pid_value"}'
# Example :
# airflow tasks test killer_dag killer_query 20210803 -t '{"pid":"12345"}'


def kill_query(**kwargs):
    with create_session() as session:
        triggered_by = (
            session.query(Log.owner)
            .filter(
                Log.dag_id == "killer_dag",
                Log.event == "trigger",
                Log.execution_date == kwargs["execution_date"],
            )
            .limit(1)
            .scalar()
        )
    logging.info(
        f"'{triggered_by}' triggered the Killer_dag. Getting PID for the termination."
    )
    pid = kwargs["params"]["pid"]
    logging.info(f"This PID= '{pid}' is going to be terminated by '{triggered_by}'.")
    analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
    analdb_conn = analdb_hook.get_conn()
    analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
    # Termination query receives pid as a parameter from cli
    killer_query = f"""
        select pg_terminate_backend('{pid}');
    """
    logging.info(killer_query)
    # Making sure the user provides existing pid.
    # In this part the boolean result of terminating select is checked and if False error is raised.
    analdb_cur.execute(killer_query)
    result = analdb_cur.fetchone()
    exists = result["pg_terminate_backend"]
    if exists == True:
        logging.info(f"The pid = '{pid}' was terminated by '{triggered_by}'.")
    else:
        logging.info(f"The pid = '{pid}' not found, check it again!")
    return exists


def kill_hanging_queries(killer_dag):
    PythonOperator(
        task_id="kill_query",
        python_callable=kill_query,
        dag=killer_dag,
        provide_context=True,
    )

killer_dag.py

from datetime import datetime, timedelta
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import kill_hanging_queries


killer_dag = DAG(
    dag_id="killer_dag",
    default_args={
        "owner": "Data Intelligence: Data Platform",
        "email": skyflow_email_list,
        "email_on_failure": True,
        "email_on_retry": False,
        "depends_on_past": False,
        "start_date": datetime(2021, 8, 8, 0, 0, 0),
        "retries": 0,
        "retry_delay": timedelta(minutes=1),
    },
)
kill_hanging_queries(killer_dag)

你得到 None 因为查询没有 return 任何结果,所以 scalar() returns None 作为默认值。

首先,如果您从 Airflow UI 浏览日志(浏览 > 审核日志)并按 dag_id 和 [=16 进行过滤=] 你会注意到 execution_date 总是空的,日期时间注册在 Dttm 字段下:

这就是您得不到结果的主要原因,因为当您按 Log.execution_date == kwargs["execution_date"] 过滤时,永远不会匹配。

因此,为了实现您正在寻找的内容,您可以关注 正在执行类似查询的位置。以此为源,你可以做类似下面的事情来获得最后一个 trigger 事件的 owner (这很可能是实际上的执行 运行) 并避免将日期作为过滤器处理。

triggered_by = (
    session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner)
    .filter(Log.dag_id == "killer_dag", Log.event == "trigger")
    .order_by(Log.dttm.desc())
    .first()[3]
)

上面的 return 是一个包含所需字段的元组,是 owner 中的第三个。

输出:

[2021-08-11 23:05:29,481] {killer_dag.py:41} INFO - This PID= '123' is going to be terminated by 'superUser'.

编辑:

注意:

请记住,如果您实际上没有触发 DAG(手动或通过调度程序),将不会有任何 Log 查询。 运行 airflow tasks test .. 不会用 Log.event == "trigger" 创建任何记录。所以在进一步调试之前,确保确实存在一个 Log 条目来查询,你可以通过浏览 UI 来完成,如上所述。

为了避免 TypeError: 'NoneType' object is not subscriptable 当查询中没有结果时,您可以将查询更改为再次使用 scalar():

triggered_by = (
    session.query(Log.owner)
    .filter(Log.dag_id == "killer_dag", Log.event == "trigger")
    .order_by(Log.dttm.desc())
    .limit(1)
    .scalar()
)

让我知道这是否对您有用!