如何在 Apache Airflow 上处理多个 http 请求

How to handle multiple http requests on Apache Airflow

我有一个包含大约 90 000 种产品的数据库,我需要使用 API 对每个产品的响应来更新此数据库。问题是,这个 API 是单次调用(我需要一次发送一个产品 SKU [请求]),所以基本上,我需要调用这个 API 90000 次。

我知道airflow有HTTP包:

from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor

我可以使用它们创建一个任务,例如:

task_get_op = SimpleHttpOperator(
    task_id='get_op',
    method='GET',
    endpoint='get',
    data={"param1": "value1", "param2": "value2"},
    headers={},
    dag=dag,
)

对于 90,000 个调用,一组 Airflow 任务不是完成这项工作的正确工具,它会非常缓慢和繁琐。 Airflow 还会为每个 HttpOperator 调用进行大量日志记录,因此它会泛滥日志等。

如果您真的想在 Airflow 中执行此操作 multi-threaded,我建议的最佳选择是将您的 90k 分成大约 9 组 10k 调用,并让 DAG 包含 9 个任务,每个任务使用一个PythonOperator 循环 10k 次。然后这 9 个任务可以 运行 并行,这应该会稍微加快速度,但它仍然不会提高性能。

我还建议您不必 运行 更新每一行数据,您必须查看是否有大量数据 API 您可以改为调用所有产品,可能会导入每个更新将每个产品转换为另一个 table,然后编写一个查询,在数据库中进行一次更新。