Airflow - 如何处理异步 API 调用?

Airflow - How to handle Asynchronous API calls?

我正在尝试弄清楚如何最好地解决以下问题。本质上,我有一个外部 API 服务,我正在向其发送请求并获取结果。

POST = 发送请求,您得到的响应是 URL,您可以将其用于 GET 请求以检索结果。

GET = 轮询 POST 请求返回的 URL,直到获得成功结果。

在气流中解决这个问题的最佳方法是什么?我的想法是基本上并行执行 2 个任务 运行。

  1. 发送 POST 请求,然后将响应 URL 保存到 XCOM。
  2. 另一个将在 while 循环中连续 运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从 URL.
  3. 中检索到成功结果,它就会从 XCOM 存储中删除

您认为这是正确的做法吗?或者我应该在 python?

中使用 asyncio 库

非常感谢任何帮助

谢谢,

您可以使用 Airflow 中的 SimpleHttpOperatorHttpSensor 实现您所描述的内容(无需安装任何额外的包)。

考虑这个使用 http_default 连接到 http bin 的例子。

要执行的任务POST请求:

task_post_op = SimpleHttpOperator(
    task_id='post_op',
    # http_conn_id='your_conn_id',
    endpoint='post',
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()['json']['priority'] == 5,
    response_filter=lambda response: 'get', # e.g  lambda response: json.loads(response.text)
    dag=dag, 
)

通过提供 response_filter,您可以操纵响应结果,这将是推送到 XCom 的值。在您的情况下,您应该 return 在下一个任务中要轮询的端点。

response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function.

请注意 response_check 参数是可选的。

执行GET请求的任务:

使用 HttpSensor 戳直到 response_check 可调用计算结果为真。

task_http_sensor_check = HttpSensor(
    task_id='http_sensor_check',
    # http_conn_id='your_conn_id',
    endpoint=task_post_op.output, 
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)

作为 endpoint 参数,我们使用 XComArg 传递从上一个任务中提取的 XCom 值。 使用 poke_interval 定义作业在每次尝试之间应等待的时间(以秒为单位)。

记得创建一个你自己的 Connection 来定义基础 URL、端口等

让我知道这是否对您有用!