Airflow - 如何处理异步 API 调用?
Airflow - How to handle Asynchronous API calls?
我正在尝试弄清楚如何最好地解决以下问题。本质上,我有一个外部 API 服务,我正在向其发送请求并获取结果。
POST = 发送请求,您得到的响应是 URL,您可以将其用于 GET 请求以检索结果。
GET = 轮询 POST 请求返回的 URL,直到获得成功结果。
在气流中解决这个问题的最佳方法是什么?我的想法是基本上并行执行 2 个任务 运行。
- 发送 POST 请求,然后将响应 URL 保存到 XCOM。
- 另一个将在 while 循环中连续 运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从 URL.
中检索到成功结果,它就会从 XCOM 存储中删除
您认为这是正确的做法吗?或者我应该在 python?
中使用 asyncio 库
非常感谢任何帮助
谢谢,
您可以使用 Airflow 中的 SimpleHttpOperator
和 HttpSensor
实现您所描述的内容(无需安装任何额外的包)。
考虑这个使用 http_default 连接到 http bin 的例子。
要执行的任务POST请求:
task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)
通过提供 response_filter
,您可以操纵响应结果,这将是推送到 XCom
的值。在您的情况下,您应该 return 在下一个任务中要轮询的端点。
response_filter: A function allowing you to manipulate the response
text. e.g response_filter=lambda response: json.loads(response.text).
The callable takes the response object as the first positional argument
and optionally any number of keyword arguments available in the context dictionary.
:type response_filter: A lambda or defined function.
请注意 response_check 参数是可选的。
执行GET请求的任务:
使用 HttpSensor 戳直到 response_check
可调用计算结果为真。
task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
作为 endpoint
参数,我们使用 XComArg 传递从上一个任务中提取的 XCom 值。
使用 poke_interval
定义作业在每次尝试之间应等待的时间(以秒为单位)。
记得创建一个你自己的 Connection 来定义基础 URL、端口等
让我知道这是否对您有用!
我正在尝试弄清楚如何最好地解决以下问题。本质上,我有一个外部 API 服务,我正在向其发送请求并获取结果。
POST = 发送请求,您得到的响应是 URL,您可以将其用于 GET 请求以检索结果。
GET = 轮询 POST 请求返回的 URL,直到获得成功结果。
在气流中解决这个问题的最佳方法是什么?我的想法是基本上并行执行 2 个任务 运行。
- 发送 POST 请求,然后将响应 URL 保存到 XCOM。
- 另一个将在 while 循环中连续 运行,从 XCOM 存储中读取新的 URL 响应并获取响应。一旦从 URL. 中检索到成功结果,它就会从 XCOM 存储中删除
您认为这是正确的做法吗?或者我应该在 python?
中使用 asyncio 库非常感谢任何帮助
谢谢,
您可以使用 Airflow 中的 SimpleHttpOperator
和 HttpSensor
实现您所描述的内容(无需安装任何额外的包)。
考虑这个使用 http_default 连接到 http bin 的例子。
要执行的任务POST请求:
task_post_op = SimpleHttpOperator(
task_id='post_op',
# http_conn_id='your_conn_id',
endpoint='post',
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()['json']['priority'] == 5,
response_filter=lambda response: 'get', # e.g lambda response: json.loads(response.text)
dag=dag,
)
通过提供 response_filter
,您可以操纵响应结果,这将是推送到 XCom
的值。在您的情况下,您应该 return 在下一个任务中要轮询的端点。
response_filter: A function allowing you to manipulate the response text. e.g response_filter=lambda response: json.loads(response.text). The callable takes the response object as the first positional argument and optionally any number of keyword arguments available in the context dictionary. :type response_filter: A lambda or defined function.
请注意 response_check 参数是可选的。
执行GET请求的任务:
使用 HttpSensor 戳直到 response_check
可调用计算结果为真。
task_http_sensor_check = HttpSensor(
task_id='http_sensor_check',
# http_conn_id='your_conn_id',
endpoint=task_post_op.output,
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
作为 endpoint
参数,我们使用 XComArg 传递从上一个任务中提取的 XCom 值。
使用 poke_interval
定义作业在每次尝试之间应等待的时间(以秒为单位)。
记得创建一个你自己的 Connection 来定义基础 URL、端口等
让我知道这是否对您有用!