气流测试-如何从cli传递参数

Airflow test - how to pass parameters from cli

我不知道如何继续这个任务。能否请你帮忙?我正在创建一个 dag,用户将在其中手动传递参数。这个 dag 应该使用“pid”来终止 postgres 数据库中的挂起查询。 我已经编写了代码,但无法通过 cli 传递参数来对其进行测试。 我正在使用这个命令: 气流任务测试 killer_dag get_idle_queries 20210802 -t '{"pid":"12345"}

这是代码: killer_dag.py

from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import (kill_hanging_queries,)
from airflow.models.log import Log
from airflow.utils.db import create_session
with create_session() as session:
    results = session.query(Log.dttm, Log.dag_id, Log.execution_date,
                            Log.owner, Log.extra) \
        .filter(Log.dag_id == 'killer_dag', Log.event ==
                'trigger').order_by(Log.dttm.desc()).all()
killer_dag = DAG(
    dag_id="killer_dag",
    default_args={
        "owner": "Data Intelligence: Data Platform",
        "email": skyflow_email_list,
        "email_on_failure": True,
        "email_on_retry": False,
        "depends_on_past": False,
        "start_date": datetime(2021, 8, 1, 0, 0, 0),
        "retries": 10,
        "retry_delay": timedelta(minutes=1),
        "sla": timedelta(minutes=90),
    },
    schedule_interval=timedelta(days=1),
)
kill_hanging_queries(killer_dag) 

utils.py

import logging
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook
from airflow.models import DagRun
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_idle_queries(**kwargs):
    logging.info(f"STARTING TO FETCH THE PID")
    logging.info(kwargs)
    pid= kwargs["pid"]
    logging.info(pid)
    logging.info("received pid: ", pid)
    # return 'Whatever you return gets printed in the logs'
    analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
    analdb_conn = analdb_hook.get_conn()
    analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
    get_idle_queries_query = """
        SELECT pg_terminate_backend('{pid}');
    """
    analdb_cur.execute(get_idle_queries_query)
    hanging_queries = analdb_cur.fetchall()
    logging.info(f"Listing info about hanging queries {hanging_queries}")  # NORO KODO STARTO
    for record in hanging_queries:
        query = record["terminate_q"]
        logging.info(f"Running query: {query}")
        analdb_cur.execute(query)
    analdb_conn.close()
def kill_hanging_queries(killer_dag):
    PythonOperator(
        task_id="get_idle_queries",
        python_callable=get_idle_queries,
        dag=killer_dag,
        provide_context=True
    ) ```

要从 CLI 传递参数,正确的方法几乎就是您的做法(除非您真的像上面的 post 那样错过了结尾 '):

airflow tasks test killer_dag get_idle_queries 20210802 -t '{"pid":"12345"}'

所以我认为您代码中的问题与您尝试访问这些参数的方式有关。在 get_idle_queries 中,您可以通过 kwargs["params"]["pid"]} 访问它们,如下所示:

def get_idle_queries(**kwargs):
    logging.info(f"STARTING TO FETCH THE PID")
    logging.info(kwargs)
    pid = kwargs["params"]["pid"]

如果这对你有用,请告诉我。