Airflow - 记录触发 dag 的用户
Airflow - log the user who triggered the dag
我尝试在终止 postgres 挂起查询的 Airflow 中记录触发我的 DAG 的用户,但它不起作用。你能帮忙看看哪里出了问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是 'None'。
utils.py(描述会话逻辑的地方)
import logging
from airflow.models.log import Log
from airflow.utils.db import create_session
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook
# To test this use this command:
# airflow tasks test killer_dag killer_query {date} -t '{"pid":"pid_value"}'
# Example :
# airflow tasks test killer_dag killer_query 20210803 -t '{"pid":"12345"}'
def kill_query(**kwargs):
with create_session() as session:
triggered_by = (
session.query(Log.owner)
.filter(
Log.dag_id == "killer_dag",
Log.event == "trigger",
Log.execution_date == kwargs["execution_date"],
)
.limit(1)
.scalar()
)
logging.info(
f"'{triggered_by}' triggered the Killer_dag. Getting PID for the termination."
)
pid = kwargs["params"]["pid"]
logging.info(f"This PID= '{pid}' is going to be terminated by '{triggered_by}'.")
analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
analdb_conn = analdb_hook.get_conn()
analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
# Termination query receives pid as a parameter from cli
killer_query = f"""
select pg_terminate_backend('{pid}');
"""
logging.info(killer_query)
# Making sure the user provides existing pid.
# In this part the boolean result of terminating select is checked and if False error is raised.
analdb_cur.execute(killer_query)
result = analdb_cur.fetchone()
exists = result["pg_terminate_backend"]
if exists == True:
logging.info(f"The pid = '{pid}' was terminated by '{triggered_by}'.")
else:
logging.info(f"The pid = '{pid}' not found, check it again!")
return exists
def kill_hanging_queries(killer_dag):
PythonOperator(
task_id="kill_query",
python_callable=kill_query,
dag=killer_dag,
provide_context=True,
)
killer_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import kill_hanging_queries
killer_dag = DAG(
dag_id="killer_dag",
default_args={
"owner": "Data Intelligence: Data Platform",
"email": skyflow_email_list,
"email_on_failure": True,
"email_on_retry": False,
"depends_on_past": False,
"start_date": datetime(2021, 8, 8, 0, 0, 0),
"retries": 0,
"retry_delay": timedelta(minutes=1),
},
)
kill_hanging_queries(killer_dag)
你得到 None
因为查询没有 return 任何结果,所以 scalar()
returns None
作为默认值。
首先,如果您从 Airflow UI 浏览日志(浏览 > 审核日志)并按 dag_id
和 [=16 进行过滤=] 你会注意到 execution_date
总是空的,日期时间注册在 Dttm
字段下:
这就是您得不到结果的主要原因,因为当您按 Log.execution_date == kwargs["execution_date"]
过滤时,永远不会匹配。
因此,为了实现您正在寻找的内容,您可以关注 正在执行类似查询的位置。以此为源,你可以做类似下面的事情来获得最后一个 trigger 事件的 owner
(这很可能是实际上的执行 运行) 并避免将日期作为过滤器处理。
triggered_by = (
session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.first()[3]
)
上面的 return 是一个包含所需字段的元组,是 owner
中的第三个。
输出:
[2021-08-11 23:05:29,481] {killer_dag.py:41} INFO - This PID= '123' is going to be terminated by 'superUser'.
编辑:
注意:
请记住,如果您实际上没有触发 DAG(手动或通过调度程序),将不会有任何 Log
查询。 运行 airflow tasks test ..
不会用 Log.event == "trigger"
创建任何记录。所以在进一步调试之前,确保确实存在一个 Log
条目来查询,你可以通过浏览 UI 来完成,如上所述。
为了避免 TypeError: 'NoneType' object is not subscriptable
当查询中没有结果时,您可以将查询更改为再次使用 scalar()
:
triggered_by = (
session.query(Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.limit(1)
.scalar()
)
让我知道这是否对您有用!
我尝试在终止 postgres 挂起查询的 Airflow 中记录触发我的 DAG 的用户,但它不起作用。你能帮忙看看哪里出了问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是 'None'。
utils.py(描述会话逻辑的地方)
import logging
from airflow.models.log import Log
from airflow.utils.db import create_session
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook
# To test this use this command:
# airflow tasks test killer_dag killer_query {date} -t '{"pid":"pid_value"}'
# Example :
# airflow tasks test killer_dag killer_query 20210803 -t '{"pid":"12345"}'
def kill_query(**kwargs):
with create_session() as session:
triggered_by = (
session.query(Log.owner)
.filter(
Log.dag_id == "killer_dag",
Log.event == "trigger",
Log.execution_date == kwargs["execution_date"],
)
.limit(1)
.scalar()
)
logging.info(
f"'{triggered_by}' triggered the Killer_dag. Getting PID for the termination."
)
pid = kwargs["params"]["pid"]
logging.info(f"This PID= '{pid}' is going to be terminated by '{triggered_by}'.")
analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
analdb_conn = analdb_hook.get_conn()
analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
# Termination query receives pid as a parameter from cli
killer_query = f"""
select pg_terminate_backend('{pid}');
"""
logging.info(killer_query)
# Making sure the user provides existing pid.
# In this part the boolean result of terminating select is checked and if False error is raised.
analdb_cur.execute(killer_query)
result = analdb_cur.fetchone()
exists = result["pg_terminate_backend"]
if exists == True:
logging.info(f"The pid = '{pid}' was terminated by '{triggered_by}'.")
else:
logging.info(f"The pid = '{pid}' not found, check it again!")
return exists
def kill_hanging_queries(killer_dag):
PythonOperator(
task_id="kill_query",
python_callable=kill_query,
dag=killer_dag,
provide_context=True,
)
killer_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import kill_hanging_queries
killer_dag = DAG(
dag_id="killer_dag",
default_args={
"owner": "Data Intelligence: Data Platform",
"email": skyflow_email_list,
"email_on_failure": True,
"email_on_retry": False,
"depends_on_past": False,
"start_date": datetime(2021, 8, 8, 0, 0, 0),
"retries": 0,
"retry_delay": timedelta(minutes=1),
},
)
kill_hanging_queries(killer_dag)
你得到 None
因为查询没有 return 任何结果,所以 scalar()
returns None
作为默认值。
首先,如果您从 Airflow UI 浏览日志(浏览 > 审核日志)并按 dag_id
和 [=16 进行过滤=] 你会注意到 execution_date
总是空的,日期时间注册在 Dttm
字段下:
这就是您得不到结果的主要原因,因为当您按 Log.execution_date == kwargs["execution_date"]
过滤时,永远不会匹配。
因此,为了实现您正在寻找的内容,您可以关注 owner
(这很可能是实际上的执行 运行) 并避免将日期作为过滤器处理。
triggered_by = (
session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.first()[3]
)
上面的 return 是一个包含所需字段的元组,是 owner
中的第三个。
输出:
[2021-08-11 23:05:29,481] {killer_dag.py:41} INFO - This PID= '123' is going to be terminated by 'superUser'.
编辑:
注意:
请记住,如果您实际上没有触发 DAG(手动或通过调度程序),将不会有任何 Log
查询。 运行 airflow tasks test ..
不会用 Log.event == "trigger"
创建任何记录。所以在进一步调试之前,确保确实存在一个 Log
条目来查询,你可以通过浏览 UI 来完成,如上所述。
为了避免 TypeError: 'NoneType' object is not subscriptable
当查询中没有结果时,您可以将查询更改为再次使用 scalar()
:
triggered_by = (
session.query(Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.limit(1)
.scalar()
)
让我知道这是否对您有用!