Airflow:如何将变量 obtenida de mi DB 传递给 SimpleHttpOperator 函数
Airflow: how to pass a variable obtenida de mi DB to SimpleHttpOperator function
我开始使用 Airflow。我需要从我的 PostgreSQL 数据库获取访问令牌,然后我必须使用该访问令牌通过 SimpleHttpOperator 函数查询 API。
这是我的代码:
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
default_args = {
'start_date':datetime(2021, 1, 1)
}
def _get_access_token():
request = "SELECT access_token FROM access_token"
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
connection = postgres_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
jobs = cursor.fetchall()
access_token = ([i[0] for i in jobs])
return access_token
with DAG('get_broadworks_subscribers', schedule_interval='@once',
default_args = default_args,
catchup=False) as dag:
# Tasks
get_access_token = PythonOperator(
task_id='get_access_token',
python_callable=_get_access_token
)
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
authorization = "Bearer" + " " + access_token[0],
headers = {
"Authorization": "authorization"
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
get_access_token >> get_subscribers_list
我收到以下错误:
authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined
希望您能帮帮我,在此先感谢您。
您可能期望 python 函数将 return 该值稍后在您的代码中使用。这不是 Airflow 的工作方式。任务之间不共享数据任务可以通过 Xcom.
共享元数据
PythonOperator
return 值被推送到 xcom(元存储中的 table)。然后下游任务可以读取该值并在字段被模板化时使用它。 SimpleHttpOperator
.
中也没有 authorization
参数
所以你的代码可以是这样的:
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
headers = {
"Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
由于 headers
是 templated 您可以从上游任务中提取 xcom 值。
注意:我不建议这样传递令牌。您可能需要考虑将其安全地存储在 Airflow Variable 中。它还可以避免您在单独的任务中从数据库中查询它的麻烦。如果您将它存储在变量中,您需要更改的是:
headers = {
"Authorization": """Bearer {{ var.value.get('my_var_name') }} """
}
请注意,如果键包含任何 'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'
,Airflow 会自动屏蔽值,但是如果您选择使用不包含任何这些的键,您仍然可以隐藏它,如果您将字符串添加到airflow.cfg 中的 sensitive_var_conn_names
有关此内容的更多信息,请参阅 docs。
我开始使用 Airflow。我需要从我的 PostgreSQL 数据库获取访问令牌,然后我必须使用该访问令牌通过 SimpleHttpOperator 函数查询 API。
这是我的代码:
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
default_args = {
'start_date':datetime(2021, 1, 1)
}
def _get_access_token():
request = "SELECT access_token FROM access_token"
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
connection = postgres_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
jobs = cursor.fetchall()
access_token = ([i[0] for i in jobs])
return access_token
with DAG('get_broadworks_subscribers', schedule_interval='@once',
default_args = default_args,
catchup=False) as dag:
# Tasks
get_access_token = PythonOperator(
task_id='get_access_token',
python_callable=_get_access_token
)
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
authorization = "Bearer" + " " + access_token[0],
headers = {
"Authorization": "authorization"
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
get_access_token >> get_subscribers_list
我收到以下错误:
authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined
希望您能帮帮我,在此先感谢您。
您可能期望 python 函数将 return 该值稍后在您的代码中使用。这不是 Airflow 的工作方式。任务之间不共享数据任务可以通过 Xcom.
共享元数据PythonOperator
return 值被推送到 xcom(元存储中的 table)。然后下游任务可以读取该值并在字段被模板化时使用它。 SimpleHttpOperator
.
authorization
参数
所以你的代码可以是这样的:
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
headers = {
"Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
由于 headers
是 templated 您可以从上游任务中提取 xcom 值。
注意:我不建议这样传递令牌。您可能需要考虑将其安全地存储在 Airflow Variable 中。它还可以避免您在单独的任务中从数据库中查询它的麻烦。如果您将它存储在变量中,您需要更改的是:
headers = {
"Authorization": """Bearer {{ var.value.get('my_var_name') }} """
}
请注意,如果键包含任何 'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'
,Airflow 会自动屏蔽值,但是如果您选择使用不包含任何这些的键,您仍然可以隐藏它,如果您将字符串添加到airflow.cfg 中的 sensitive_var_conn_names
有关此内容的更多信息,请参阅 docs。