在另一个任务中访问来自 SimpleHTTPOperator 的响应
Accessing response from SimpleHTTPOperator in another task
关于 ,假设我们有一个包含两个任务的 Apache Airflow DAG,首先是一个 HTTP 请求(即 SimpleHTTPOperator),然后是一个对第一个任务的响应执行某些操作的 PythonOperator。
方便地,以 Dog CEO API 为例,考虑以下 DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['someone@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
with DAG(
'dog_api',
default_args=default_args,
description='Get nice dog pics',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['dog'],
) as dag:
get_dog = SimpleHttpOperator(
task_id='get_dog',
http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
endpoint='/breeds/image/random',
method="GET",
# xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
)
def xcom_check(ds, **kwargs):
val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
return f"xcom_check has: {kwargs['ti']} and it says: {val}"
inspect_dog = PythonOperator(
task_id='inspect_dog',
python_callable=xcom_check,
provide_context=True
)
我们想访问 xcom_check
中 get_dog
的 return 值。通过检查日志,get_dog
将 xcom 存储很好地填充为如下内容:
但是现在,这个目前还没有传递给第二个任务。这也可以通过检查日志看出,其中说(除其他外):
*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None
很明显,任务实例是“dog_api.inspect_dog”,但我们希望它是“dog_api.get_dog”。这是怎么做到的?在撰写本文时,在上一个问题的评论中提出了相同的问题,已投票,但未得到答复。我也试过 但无法弄清楚我还在做什么不同的事情。
你的问题是你没有设置任务之间的依赖关系,所以 inspect_dog
可能 运行 在 get_dog
之前或与 get_dog
并行,当这种情况发生时 get_dog
将看不到xcom 值,因为 inspect_dog
还没有推送它。
你只需要将依赖设置为:
get_dog >> inspect_dog
日志:
[2021-10-31, 07:07:21 UTC] {python.py:174} INFO - Done. Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-31T07:05:27.721051+00:00 [running]> and it says: {"message":"https:\/\/images.dog.ceo\/breeds\/pointer-germanlonghair\/hans1.jpg","status":"success"}
至于您在代码中对 xcom_push
的评论:
xcom_push
参数用于较旧的 Airflow 版本。它被 do_xcom_push
取代(参见 source code)。请注意,此参数的默认值为 True。
关于
方便地,以 Dog CEO API 为例,考虑以下 DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['someone@email.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
with DAG(
'dog_api',
default_args=default_args,
description='Get nice dog pics',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['dog'],
) as dag:
get_dog = SimpleHttpOperator(
task_id='get_dog',
http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
endpoint='/breeds/image/random',
method="GET",
# xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
)
def xcom_check(ds, **kwargs):
val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
return f"xcom_check has: {kwargs['ti']} and it says: {val}"
inspect_dog = PythonOperator(
task_id='inspect_dog',
python_callable=xcom_check,
provide_context=True
)
我们想访问 xcom_check
中 get_dog
的 return 值。通过检查日志,get_dog
将 xcom 存储很好地填充为如下内容:
但是现在,这个目前还没有传递给第二个任务。这也可以通过检查日志看出,其中说(除其他外):
*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None
很明显,任务实例是“dog_api.inspect_dog”,但我们希望它是“dog_api.get_dog”。这是怎么做到的?在撰写本文时,在上一个问题的评论中提出了相同的问题,已投票,但未得到答复。我也试过
你的问题是你没有设置任务之间的依赖关系,所以 inspect_dog
可能 运行 在 get_dog
之前或与 get_dog
并行,当这种情况发生时 get_dog
将看不到xcom 值,因为 inspect_dog
还没有推送它。
你只需要将依赖设置为:
get_dog >> inspect_dog
日志:
[2021-10-31, 07:07:21 UTC] {python.py:174} INFO - Done. Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-31T07:05:27.721051+00:00 [running]> and it says: {"message":"https:\/\/images.dog.ceo\/breeds\/pointer-germanlonghair\/hans1.jpg","status":"success"}
至于您在代码中对 xcom_push
的评论:
xcom_push
参数用于较旧的 Airflow 版本。它被 do_xcom_push
取代(参见 source code)。请注意,此参数的默认值为 True。