如何访问 Airflow SimpleHttpOperator GET 请求的响应

How to access the response from Airflow SimpleHttpOperator GET request

我正在学习 Airflow,有一个简单的问题。下面是我的 DAG,名为 dog_retriever:

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': 'rachel@loftium.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)
    
t2.set_upstream(t1)

作为测试 Airflow 的一种方式,我只是在这个非常简单的 http://dog.ceo API 中向某些端点发出两个 GET 请求。目标是学习如何处理通过 Airflow

检索到的一些数据

执行正常 - 我的代码成功调用了任务 t1 和 t2 中的端点,我可以看到它们以基于 set_upstream 规则的正确顺序记录在 Airflow UI 中我写了。

我想不通的是如何访问这 2 个任务的 JSON 响应。看起来很简单,但我想不通。在 SimpleHtttpOperator 中,我看到 response_check 的参数,但没有任何内容可以简单地打印、存储或查看 JSON 响应。

谢谢。

因此,由于这是 SimpleHttpOperator,实际的 json 被推送到 XCOM,您可以从那里获取它。这是该操作的代码行:https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

您需要做的是设置 xcom_push=True,因此您的第一个 t1 将如下所示:

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    xcom_push=True,
    dag=dag)

你应该能够在 XCOM 中找到所有带有 return value 的 JSON,XCOM 的更多详细信息可以在以下位置找到:https://airflow.incubator.apache.org/concepts.html#xcoms

我主要为任何试图(或想要)调用来自流程的 Airflow 工作流 DAG 并接收的人添加此答案 来自 DAG 的 activity.

的任何 数据

重要了解 运行 DAG 需要 HTTP POST 并且 response 这个 POST 在 Airflow 中是 硬编码 ,即如果不更改 Airflow 代码本身,Airflow 永远不会 return 除了状态代码和消息到请求进程。

Airflow 似乎主要用于为 ETL(提取、转换、加载)工作流创建数据管道,现有的 Airflow Operators,例如SimpleHttpOperator,可以从 RESTful 网络服务 获取数据,对其进行处理,并使用其他运算符将其写入数据库,但不要 return在对 HTTP POST 的响应中,运行 是工作流 DAG。

即使操作员在响应中做了 return 此数据,查看 Airflow 源代码确认 trigger_dag() 方法不检查或 return 它:

apache_airflow_airflow_www_api_experimental_endpoints.py

apache_airflow_airflow_api_client_json_client.py

它所做的一切 return 就是这条确认消息:

Airflow DagRun Message Received in Orchestration Service

由于 Airflow 是开源的,我想我们可以将 trigger_dag() 方法修改为 return 数据,但那样我们就只能维护分叉的代码库了,我们不会这样做能够在 Google Cloud Platform 上使用 cloud-hosted、Airflow-based 服务,例如 Cloud Composer,因为它不包含我们的修改。

更糟糕的是,Apache Airflow 甚至没有 return 正确处理其 hard-coded 状态消息。

当我们 POST 成功到达 Airflow /dags/{DAG-ID}/dag_runs 端点时,我们收到“200 OK”响应, 一个“201 已创建”的响应,这是我们应该做的。 Airflow“hard-codes”响应的内容body及其“已创建...”状态消息。 标准,但是,是在响应header中对return新建资源的Uri,而不是在body …这将使 body 可以自由使用 return 任何数据 produced/aggregated 在此创建期间(或由此产生)。

我将此缺陷归因于“盲目”(或我称之为“天真”)Agile/MVP-driven 方法,该方法仅添加了要求的功能,而不是保持意识并为更通用的实用程序留出空间.由于 Airflow 绝大多数用于为(和由)数据科学家(而非软件工程师)创建数据管道Airflow 操作员可以相互共享数据 使用其专有的内部 XCom 功能作为 @Chengzhi 的有用答案指出(谢谢!)但在任何情况下都不能 return 向请求者提供数据 DAG,即 SimpleHttpOperator 可以从 third-party RESTful 服务检索数据,并可以与 PythonOperator(通过 XCom)共享该数据,从而丰富、聚合、and/or 转换它。然后,PythonOperator 可以与将结果直接存储在数据库中的 PostgresOperator 共享其数据。但是 结果永远无法 return 发送到请求完成工作的进程 ,即我们的编排服务,这使得 Airflow 对于任何用例都无用,但被驱动的用例除外它的当前用户。

这里的要点(至少对我而言)是 1) 永远不要将过多的专业知识归于任何人或任何组织。 Apache 是一个重要的组织,在软件开发方面有着深厚而重要的根基……但它们并不完美。并且 2) 始终提防内部专有解决方案。开放的 standards-based 解决方案已经从许多不同的角度进行了检查和审查,而不仅仅是一个。

我花了将近一个星期的时间来寻找不同的方法来做一件看似非常简单合理的事情。我希望这个答案能为其他人节省一些时间。