Airflow Xcom 未得到解析 return task_instance 字符串
Airflow Xcom not getting resolved return task_instance string
我遇到了一个关于 xcom_pull 的奇怪问题,它总是返回一个 xcom_pull 字符串
"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
我的要求很简单我已经使用 python 运算符和 xcom_pull 推送了一个 xcom 我正在尝试检索值并将其作为 SimpleHttpOperator 的 http_conn_id 传递,但是变量正在返回一个字符串而不是解析 xcom_pull 值。
Python Operator
成功推送XCom。
代码:
from datetime import datetime
import simplejson as json
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from google.auth.transport.requests import Request
default_airflow_args = {
"owner": "divyaansh",
"depends_on_past": False,
"start_date": datetime(2022, 5, 18),
"retries": 0,
"schedule_interval": "@hourly",
}
project_configs = {
"project_id": "test",
"conn_id": "google_cloud_storage_default",
"bucket_name": "test-transfer",
"folder_name": "processed-test-rdf",
}
def get_config_vals(**kwargs) -> dict:
"""
Get config vals from airlfow variable and store it as xcoms
"""
task_instance = kwargs["task_instance"]
task_instance.xcom_push(key="http_con_id", value="gcp_cloud_function")
def generate_api_token(cf_name: str):
"""
generate token for api request
"""
import google.oauth2.id_token
request = Request()
target_audience = f"https://us-central1-test-a2h.cloudfunctions.net/{cf_name}"
return google.oauth2.id_token.fetch_id_token(
request=request, audience=target_audience
)
with DAG(
dag_id="cf_test",
default_args=default_airflow_args,
catchup=False,
render_template_as_native_obj=True,
) as dag:
start = DummyOperator(task_id="start")
config_vals = PythonOperator(
task_id="get_config_val", python_callable=get_config_vals, provide_context=True
)
ip_data = json.dumps(
{
"bucket_name": project_configs["bucket_name"],
"file_name": "dummy",
"target_location": "/valid",
}
)
conn_id = "{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
api_token = generate_api_token("new-cp")
cf_task = SimpleHttpOperator(
task_id="file_decrypt_and_validate_cf",
http_conn_id=conn_id,
method="POST",
endpoint="new-cp",
data=json.dumps(
json.dumps(
{
"bucket_name": "test-transfer",
"file_name": [
"processed-test-rdf/dummy_20220501.txt",
"processed-test-rdf/dummy_20220502.txt",
],
"target_location": "/valid",
}
)
),
headers={
"Authorization": f"bearer {api_token}",
"Content-Type": "application/json",
},
do_xcom_push=True,
log_response=True,
)
print("task new-cp", cf_task)
check_flow = DummyOperator(task_id="check_flow")
end = DummyOperator(task_id="end")
start >> config_vals >> cf_task >> check_flow >> end
错误信息:
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"` isn't defined
我尝试了好几天,但似乎没有任何效果。
有人能给我指出正确的方向吗?
气流版本:2.2.3
作曲家版本:2.0.11
在 SimpleHttpOperator
中,http_conn_id
参数不是模板化字段,因此您不能使用它来使用 Jinja 引擎。这意味着这个参数不能被渲染。因此,当您将 "{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
传递给运算符时,您希望在运行时将其替换为先前任务存储在 Xcom 中的值,但实际上 Airflow 将其视为常规字符串,这也是异常告诉您的内容。它实际上尝试搜索与您的超长字符串名称的连接,但找不到它,因此它告诉您连接未定义。
要解决这个问题,您可以创建一个自定义运算符:
class MySimpleHttpOperator(SimpleHttpOperator):
template_fields = SimpleHttpOperator.template_fields + ("http_conn_id",)
那么你应该在你的 DAG 中用 MySimpleHttpOperator
替换 SimpleHttpOperator
。
此更改使您在 http_conn_id
中设置的字符串通过 Jinja 引擎传递。因此,在您的情况下,字符串将如您期望的那样替换为 Xcom 值。
我遇到了一个关于 xcom_pull 的奇怪问题,它总是返回一个 xcom_pull 字符串
"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
我的要求很简单我已经使用 python 运算符和 xcom_pull 推送了一个 xcom 我正在尝试检索值并将其作为 SimpleHttpOperator 的 http_conn_id 传递,但是变量正在返回一个字符串而不是解析 xcom_pull 值。
Python Operator
成功推送XCom。
代码:
from datetime import datetime
import simplejson as json
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from google.auth.transport.requests import Request
default_airflow_args = {
"owner": "divyaansh",
"depends_on_past": False,
"start_date": datetime(2022, 5, 18),
"retries": 0,
"schedule_interval": "@hourly",
}
project_configs = {
"project_id": "test",
"conn_id": "google_cloud_storage_default",
"bucket_name": "test-transfer",
"folder_name": "processed-test-rdf",
}
def get_config_vals(**kwargs) -> dict:
"""
Get config vals from airlfow variable and store it as xcoms
"""
task_instance = kwargs["task_instance"]
task_instance.xcom_push(key="http_con_id", value="gcp_cloud_function")
def generate_api_token(cf_name: str):
"""
generate token for api request
"""
import google.oauth2.id_token
request = Request()
target_audience = f"https://us-central1-test-a2h.cloudfunctions.net/{cf_name}"
return google.oauth2.id_token.fetch_id_token(
request=request, audience=target_audience
)
with DAG(
dag_id="cf_test",
default_args=default_airflow_args,
catchup=False,
render_template_as_native_obj=True,
) as dag:
start = DummyOperator(task_id="start")
config_vals = PythonOperator(
task_id="get_config_val", python_callable=get_config_vals, provide_context=True
)
ip_data = json.dumps(
{
"bucket_name": project_configs["bucket_name"],
"file_name": "dummy",
"target_location": "/valid",
}
)
conn_id = "{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
api_token = generate_api_token("new-cp")
cf_task = SimpleHttpOperator(
task_id="file_decrypt_and_validate_cf",
http_conn_id=conn_id,
method="POST",
endpoint="new-cp",
data=json.dumps(
json.dumps(
{
"bucket_name": "test-transfer",
"file_name": [
"processed-test-rdf/dummy_20220501.txt",
"processed-test-rdf/dummy_20220502.txt",
],
"target_location": "/valid",
}
)
),
headers={
"Authorization": f"bearer {api_token}",
"Content-Type": "application/json",
},
do_xcom_push=True,
log_response=True,
)
print("task new-cp", cf_task)
check_flow = DummyOperator(task_id="check_flow")
end = DummyOperator(task_id="end")
start >> config_vals >> cf_task >> check_flow >> end
错误信息:
raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `"{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"` isn't defined
我尝试了好几天,但似乎没有任何效果。 有人能给我指出正确的方向吗?
气流版本:2.2.3 作曲家版本:2.0.11
在 SimpleHttpOperator
中,http_conn_id
参数不是模板化字段,因此您不能使用它来使用 Jinja 引擎。这意味着这个参数不能被渲染。因此,当您将 "{{ task_instance.xcom_pull(dag_id = 'cf_test',task_ids='get_config_val',key='http_con_id') }}"
传递给运算符时,您希望在运行时将其替换为先前任务存储在 Xcom 中的值,但实际上 Airflow 将其视为常规字符串,这也是异常告诉您的内容。它实际上尝试搜索与您的超长字符串名称的连接,但找不到它,因此它告诉您连接未定义。
要解决这个问题,您可以创建一个自定义运算符:
class MySimpleHttpOperator(SimpleHttpOperator):
template_fields = SimpleHttpOperator.template_fields + ("http_conn_id",)
那么你应该在你的 DAG 中用 MySimpleHttpOperator
替换 SimpleHttpOperator
。
此更改使您在 http_conn_id
中设置的字符串通过 Jinja 引擎传递。因此,在您的情况下,字符串将如您期望的那样替换为 Xcom 值。