如何在 Airflow 中传递不记名令牌
How to pass bearer token in the Airflow
我的工作有 3 个任务
1) 使用 POST 请求获取令牌
2)获取令牌值并存储在变量中
3) 使用步骤 2 中的令牌发出 GET 请求并传递承载令牌
问题是第 3 步不起作用,我收到 HTTP 错误。我能够在步骤 2 中打印 token 的值并在代码中验证
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
token ="mytoken" //defined with some value which will be updated later
get_token = SimpleHttpOperator(
task_id='get_token',
method='POST',
headers={"Authorization": "Basic xxxxxxxxxxxxxxx=="},
endpoint='/token?username=user&password=pass&grant_type=password',
http_conn_id = 'test_http',
trigger_rule="all_done",
xcom_push=True,
dag=dag
)
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='get_token')
print("printing token")
print value
wjdata = json.loads(value)
print(wjdata['access_token'])
token=wjdata['access_token']
print token
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=pull_function,
dag=dag,
)
get_config = SimpleHttpOperator(
task_id='get_config',
method='GET',
headers={"Authorization": "Bearer " + token},
endpoint='someendpoint',
http_conn_id = 'test_conn',
trigger_rule="all_done",
xcom_push=True,
dag=dag
)
get_token >> run_this >> get_config
您将 token
存储为 "global" 变量的方式将不起作用。 Dag 定义文件(您定义任务的脚本)与执行每个任务的 运行 时间上下文不同。每个任务都可以 运行 在单独的线程、进程中,甚至在另一台机器上,这取决于执行者。您在任务之间传递数据的方式不是通过全局变量,而是使用 XCom——您已经部分这样做了。
尝试以下操作:
- 远程全局 token
变量
- 在 pull_function
而不是 print token
中执行 return token
- 这会将值再次推送到 XCom,以便下一个任务可以访问它
- 在下一个任务中从 XCom 访问令牌。
最后一步有点棘手,因为您使用的是 SimpleHttpOperator
,它的模板字段只有 endpoint
和 data
,而不是 headers
。
例如,如果你想从上一个任务的 XCom 中传递一些 data
,你会做这样的事情:
get_config = SimpleHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
data='{{ task_instance.xcom_pull(task_ids="print_the_context", key="some_key") }}'
)
但不幸的是,您不能对 headers 执行相同的操作,因此您必须通过 PythonOperator "manually" 执行此操作,或者您可以继承 SimpleHttpOperator
并创建您的自己的,比如:
class HeaderTemplatedHttpOperator(SimpleHttpOperator):
template_fields = ('endpoint', 'data', 'headers') # added 'headers' headers
然后使用那个,例如:
get_config = HeaderTemplatedHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
headers='{{ task_instance.xcom_pull(task_ids="print_the_context") }}'
)
请记住,我没有对此进行测试,只是为了解释这个概念。试一试这个方法,你应该会到达那里。
我的工作有 3 个任务 1) 使用 POST 请求获取令牌 2)获取令牌值并存储在变量中 3) 使用步骤 2 中的令牌发出 GET 请求并传递承载令牌
问题是第 3 步不起作用,我收到 HTTP 错误。我能够在步骤 2 中打印 token 的值并在代码中验证
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
token ="mytoken" //defined with some value which will be updated later
get_token = SimpleHttpOperator(
task_id='get_token',
method='POST',
headers={"Authorization": "Basic xxxxxxxxxxxxxxx=="},
endpoint='/token?username=user&password=pass&grant_type=password',
http_conn_id = 'test_http',
trigger_rule="all_done",
xcom_push=True,
dag=dag
)
def pull_function(**context):
value = context['task_instance'].xcom_pull(task_ids='get_token')
print("printing token")
print value
wjdata = json.loads(value)
print(wjdata['access_token'])
token=wjdata['access_token']
print token
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=pull_function,
dag=dag,
)
get_config = SimpleHttpOperator(
task_id='get_config',
method='GET',
headers={"Authorization": "Bearer " + token},
endpoint='someendpoint',
http_conn_id = 'test_conn',
trigger_rule="all_done",
xcom_push=True,
dag=dag
)
get_token >> run_this >> get_config
您将 token
存储为 "global" 变量的方式将不起作用。 Dag 定义文件(您定义任务的脚本)与执行每个任务的 运行 时间上下文不同。每个任务都可以 运行 在单独的线程、进程中,甚至在另一台机器上,这取决于执行者。您在任务之间传递数据的方式不是通过全局变量,而是使用 XCom——您已经部分这样做了。
尝试以下操作:
- 远程全局 token
变量
- 在 pull_function
而不是 print token
中执行 return token
- 这会将值再次推送到 XCom,以便下一个任务可以访问它
- 在下一个任务中从 XCom 访问令牌。
最后一步有点棘手,因为您使用的是 SimpleHttpOperator
,它的模板字段只有 endpoint
和 data
,而不是 headers
。
例如,如果你想从上一个任务的 XCom 中传递一些 data
,你会做这样的事情:
get_config = SimpleHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
data='{{ task_instance.xcom_pull(task_ids="print_the_context", key="some_key") }}'
)
但不幸的是,您不能对 headers 执行相同的操作,因此您必须通过 PythonOperator "manually" 执行此操作,或者您可以继承 SimpleHttpOperator
并创建您的自己的,比如:
class HeaderTemplatedHttpOperator(SimpleHttpOperator):
template_fields = ('endpoint', 'data', 'headers') # added 'headers' headers
然后使用那个,例如:
get_config = HeaderTemplatedHttpOperator(
task_id='get_config',
endpoint='someendpoint',
http_conn_id = 'test_conn',
dag=dag,
headers='{{ task_instance.xcom_pull(task_ids="print_the_context") }}'
)
请记住,我没有对此进行测试,只是为了解释这个概念。试一试这个方法,你应该会到达那里。