如何在 Apache Airflow 上处理多个 http 请求
How to handle multiple http requests on Apache Airflow
我有一个包含大约 90 000 种产品的数据库,我需要使用 API 对每个产品的响应来更新此数据库。问题是,这个 API 是单次调用(我需要一次发送一个产品 SKU [请求]),所以基本上,我需要调用这个 API 90000 次。
我知道airflow有HTTP包:
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
我可以使用它们创建一个任务,例如:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='get',
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
- 考虑到 DAG 是非循环的,我该如何调用这个任务 multiple
不同参数的时间? (90 000 次不同的 SKU 更多
正是...)
对于 90,000 个调用,一组 Airflow 任务不是完成这项工作的正确工具,它会非常缓慢和繁琐。 Airflow 还会为每个 HttpOperator 调用进行大量日志记录,因此它会泛滥日志等。
如果您真的想在 Airflow 中执行此操作 multi-threaded,我建议的最佳选择是将您的 90k 分成大约 9 组 10k 调用,并让 DAG 包含 9 个任务,每个任务使用一个PythonOperator 循环 10k 次。然后这 9 个任务可以 运行 并行,这应该会稍微加快速度,但它仍然不会提高性能。
我还建议您不必 运行 更新每一行数据,您必须查看是否有大量数据 API 您可以改为调用所有产品,可能会导入每个更新将每个产品转换为另一个 table,然后编写一个查询,在数据库中进行一次更新。
我有一个包含大约 90 000 种产品的数据库,我需要使用 API 对每个产品的响应来更新此数据库。问题是,这个 API 是单次调用(我需要一次发送一个产品 SKU [请求]),所以基本上,我需要调用这个 API 90000 次。
我知道airflow有HTTP包:
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
我可以使用它们创建一个任务,例如:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='get',
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
- 考虑到 DAG 是非循环的,我该如何调用这个任务 multiple 不同参数的时间? (90 000 次不同的 SKU 更多 正是...)
对于 90,000 个调用,一组 Airflow 任务不是完成这项工作的正确工具,它会非常缓慢和繁琐。 Airflow 还会为每个 HttpOperator 调用进行大量日志记录,因此它会泛滥日志等。
如果您真的想在 Airflow 中执行此操作 multi-threaded,我建议的最佳选择是将您的 90k 分成大约 9 组 10k 调用,并让 DAG 包含 9 个任务,每个任务使用一个PythonOperator 循环 10k 次。然后这 9 个任务可以 运行 并行,这应该会稍微加快速度,但它仍然不会提高性能。
我还建议您不必 运行 更新每一行数据,您必须查看是否有大量数据 API 您可以改为调用所有产品,可能会导入每个更新将每个产品转换为另一个 table,然后编写一个查询,在数据库中进行一次更新。