Airflow:如何将变量 obtenida de mi DB 传递给 SimpleHttpOperator 函数

Airflow: how to pass a variable obtenida de mi DB to SimpleHttpOperator function

我开始使用 Airflow。我需要从我的 PostgreSQL 数据库获取访问令牌,然后我必须使用该访问令牌通过 SimpleHttpOperator 函数查询 API。

这是我的代码:

from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator

from datetime import datetime
import json


default_args = {
    'start_date':datetime(2021, 1, 1)
}

def _get_access_token():
    request = "SELECT access_token FROM access_token"
    postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
    connection = postgres_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    jobs = cursor.fetchall()
    access_token = ([i[0] for i in jobs])

    return access_token


with DAG('get_broadworks_subscribers', schedule_interval='@once',
    default_args = default_args,
    catchup=False) as dag:

    # Tasks

    get_access_token = PythonOperator(
    task_id='get_access_token', 
    python_callable=_get_access_token
    )

    get_subscribers_list = SimpleHttpOperator(
        task_id = 'get_subscribers_list',
        http_conn_id = 'webex',
        endpoint = 'v1/broadworks/subscribers/',
        method = 'GET',
        authorization = "Bearer" + " " + access_token[0],
        headers = {
            "Authorization": "authorization"
        },
        response_filter = lambda response: json.loads(response.text),
        log_response = True
    )

get_access_token >> get_subscribers_list

我收到以下错误:

    authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined

希望您能帮帮我,在此先感谢您。

您可能期望 python 函数将 return 该值稍后在您的代码中使用。这不是 Airflow 的工作方式。任务之间不共享数据任务可以通过 Xcom.

共享元数据

PythonOperator return 值被推送到 xcom(元存储中的 table)。然后下游任务可以读取该值并在字段被模板化时使用它。 SimpleHttpOperator.

中也没有 authorization 参数

所以你的代码可以是这样的:

get_subscribers_list = SimpleHttpOperator(
    task_id = 'get_subscribers_list',
    http_conn_id = 'webex',
    endpoint = 'v1/broadworks/subscribers/',
    method = 'GET',
    headers = {
        "Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
    },
    response_filter = lambda response: json.loads(response.text),
    log_response = True
)

由于 headerstemplated 您可以从上游任务中提取 xcom 值。

注意:我不建议这样传递令牌。您可能需要考虑将其安全地存储在 Airflow Variable 中。它还可以避免您在单独的任务中从数据库中查询它的麻烦。如果您将它存储在变量中,您需要更改的是:

    headers = {
        "Authorization": """Bearer {{ var.value.get('my_var_name') }} """
    }

请注意,如果键包含任何 'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token',Airflow 会自动屏蔽值,但是如果您选择使用不包含任何这些的键,您仍然可以隐藏它,如果您将字符串添加到airflow.cfg 中的 sensitive_var_conn_names 有关此内容的更多信息,请参阅 docs