Airflow API 负载作为 python 脚本的输入

Airflow API payload as input to python script

我正在使用以下 python 脚本触发 Airflow DAG: API 调用:

import requests
import json

url = "http://127.0.0.1:8080/api/v1/dags/get_external_params/dagRuns"

payload = json.dumps({
  "conf": {
    "ip": "198.51.100.96",
    "hostname": "abc",
    "domainname": "server456.com",
    "username": "airflow",
    "password": "XXXX"
  }
})
headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Basic XXXX'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

我正在尝试一次打印所有 conf 值,而不是一个一个地打印它们。我在示例中看到的只是拥有一个键值对消息并打印它们 DAG

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.utils.dates import days_ago

def run_this_func(**kwargs):
    dag_run = kwargs.get('dag_run')
    parameters = dag_run.conf['ip'] #get all parameters instead of one key
    return parameters

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='get_external_params',
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['DNS'],
) as dag:


   run_this = PythonOperator(
     task_id='run_this',
     python_callable=run_this_func,
     dag=dag,
     #op_kwargs = How can I get all the ip,hostname,username,password here
     provide_context=True,
   )
   runthis

我的意图是调用带参数的 python 脚本 例如:somename.py -用户名 xxxx -密码 xxx 但在此之前,我试图获取 Airflow 脚本中的值。 请帮忙!!!

恕我直言,最简单的方法是使用 params 而不是 op_kwargs。考虑这个示例请求:

import requests
import json

url = "localhost:8080/api/v1/dags/api_triggered_with_params/dagRuns"

payload = json.dumps({
  "conf": {
    "param_via_API": "triggered from stable API"
  }
})
headers = {
  'Content-Type': 'application/json',
  'Authorization': 'Basic xxxx'
}

response = requests.request("POST", url, headers=headers, data=payload)

print(response.text)

DAG:

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def _print_params(**kwargs):
    print(f"Task_id: {kwargs['ti'].task_id}")
    for k, v in kwargs["params"].items():
        print(f"{k}:{v}")

dag = DAG(
    dag_id="api_triggered_with_params",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=["example_dags"],
    params={"param_at_dag_level": "param_66"},
    catchup=False,
)
with dag:

    python_task = PythonOperator(
        task_id="python_task",
        python_callable=_print_params,
    )
    python_task_2 = PythonOperator(
        task_id="python_task_2",
        python_callable=_print_params,
        params={"param4": "param defined at task level"},
    )

chain(python_task, python_task_2)

在 python 可调用函数中,您可以访问在任何级别定义的任何参数(DAG 级别、任务或在 conf 中定义,当触发 DagRun ).

输出日志:

python_task

[2021-07-28 13:17:24,716] {logging_mixin.py:104} INFO - Task_id: python_task
[2021-07-28 13:17:24,717] {logging_mixin.py:104} INFO - param_at_dag_level:param_66
[2021-07-28 13:17:24,717] {logging_mixin.py:104} INFO - param_via_API:triggered from stable API
[2021-07-28 13:17:24,717] {python.py:151} INFO - Done. Returned value was: None

python_task_2

[2021-07-28 13:17:26,034] {logging_mixin.py:104} INFO - Task_id: python_task_2
[2021-07-28 13:17:26,034] {logging_mixin.py:104} INFO - param_at_dag_level:param_66
[2021-07-28 13:17:26,035] {logging_mixin.py:104} INFO - param4:param defined at task level
[2021-07-28 13:17:26,035] {logging_mixin.py:104} INFO - param_via_API:triggered from stable API
[2021-07-28 13:17:26,035] {python.py:151} INFO - Done. Returned value was: None

让我知道这是否对您有用!

我不确定我是否 100% 正确地理解了你的问题,但 dag_run.conf 是一个字典 ({'ip': '198.51.100.96', 'hostname': 'abc', 'domainname': 'server456.com', 'username': '***', 'password': 'XXXX'}),所以你可以从中提取你喜欢的所有键。但是无法将 keys/values 分配给更短的变量:

parameters = dag_run.conf
ip = parameters["ip"]
hostname = parameters["hostname"]
domainname = parameters["domainname"]

您可以打印完整的对象。完整的 DAG:

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(dag_id="demo", start_date=days_ago(3), schedule_interval=None)


def _do_magic(**context):
    print(context["dag_run"].conf)
    # Will print "{'ip': '198.51.100.96', 'hostname': 'abc', 'domainname': 'server456.com', 'username': '***', 'password': 'XXXX'}"


do_magic = PythonOperator(task_id="do_magic", python_callable=_do_magic, dag=dag)

由于 op_kwargs 是可模板化的,您还可以提供这些作为 Jinja 模板化字符串:

from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(dag_id="demo", start_date=days_ago(3), schedule_interval=None)


def _do_magic(ip):
    print(ip)
    # Will print 198.51.100.96


do_magic = PythonOperator(
    task_id="do_magic",
    python_callable=_do_magic,
    op_kwargs={"ip": "{{ dag_run.conf['ip'] }}"},
    dag=dag,
)

这个回答了吗?