在另一个任务中访问来自 SimpleHTTPOperator 的响应

Accessing response from SimpleHTTPOperator in another task

关于 ,假设我们有一个包含两个任务的 Apache Airflow DAG,首先是一个 HTTP 请求(即 SimpleHTTPOperator),然后是一个对第一个任务的响应执行某些操作的 PythonOperator。

方便地,以 Dog CEO API 为例,考虑以下 DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['someone@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}
with DAG(
    'dog_api',
    default_args=default_args,
    description='Get nice dog pics',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['dog'],
) as dag:
    get_dog = SimpleHttpOperator(
        task_id='get_dog',
        http_conn_id='dog_api', # NOTE: set up an HTTP connection called 'dog_api' with host 'https://dog.ceo/api'
        endpoint='/breeds/image/random',
        method="GET",
        # xcom_push=True # NOTE: no such argument in 2.2.0 but sometimes suggested by older guides online
    )
    
    def xcom_check(ds, **kwargs):
        val = kwargs['ti'].xcom_pull(key='return_value', task_ids='get_dog')
        return f"xcom_check has: {kwargs['ti']} and it says: {val}"
     
    inspect_dog = PythonOperator(
        task_id='inspect_dog',
        python_callable=xcom_check,
        provide_context=True
    )

我们想访问 xcom_checkget_dog 的 return 值。通过检查日志,get_dog 将 xcom 存储很好地填充为如下内容:

但是现在,这个目前还没有传递给第二个任务。这也可以通过检查日志看出,其中说(除其他外):

*redacted* Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-30T16:27:23.081539+00:00 [running]> and it says: None

很明显,任务实例是“dog_api.inspect_dog”,但我们希望它是“dog_api.get_dog”。这是怎么做到的?在撰写本文时,在上一个问题的评论中提出了相同的问题,已投票,但未得到答复。我也试过 但无法弄清楚我还在做什么不同的事情。

你的问题是你没有设置任务之间的依赖关系,所以 inspect_dog 可能 运行 在 get_dog 之前或与 get_dog 并行,当这种情况发生时 get_dog 将看不到xcom 值,因为 inspect_dog 还没有推送它。

你只需要将依赖设置为:

get_dog >> inspect_dog

日志:

[2021-10-31, 07:07:21 UTC] {python.py:174} INFO - Done. Returned value was: xcom_check has: <TaskInstance: dog_api.inspect_dog manual__2021-10-31T07:05:27.721051+00:00 [running]> and it says: {"message":"https:\/\/images.dog.ceo\/breeds\/pointer-germanlonghair\/hans1.jpg","status":"success"}

至于您在代码中对 xcom_push 的评论: xcom_push 参数用于较旧的 Airflow 版本。它被 do_xcom_push 取代(参见 source code)。请注意,此参数的默认值为 True。