URL 的分页气流循环

Airflow loop through pagination for URL

我创建了一个 DAG,用于从 API 获取 JSON 数据。从 API 获取数据后,它将数据上传到 S3。来自 API 的数据是分页的,我一次只能得到 100 条记录。根据创建的数据,我可能需要循环几次才能获取数据。我只知道从 API.

中选取第一个数据后要循环多少页

所以我的任务是

initialize_env = DummyOperator(task_id='initialize_env')

get_data = SimpleHttpOperator(task_id='get_data',
                              url=url,
                              endpoint='users?page={page_id}') #this task loops number of times
                              
upload_to_s3 = MyOperator(task_id='upload_to_s3') #this task loops number of times

read_data = PythonOperator(task_id='upload_to_s3',
                            python_callable=function_reading_data) #this task should stop the loop
                            
send_email = DummyOperator(task_id='send_email') #send email at the end of looping

我想了解如何在 Airflow 中进行循环以便浏览所有页面?

循环任务在 Airflow 中不是一个好的做法(如果可能的话)。

我建议使用 Python 运算符来为您执行循环。在 python 运算符中创建循环以点击 API 并上传到 S3。或者,您可以遍历并在 get_data 中创建一个页面列表,通过 XCOM 传递该列表以上传,然后再次传递相同的列表以读取数据。

取决于你想投入多少 python 以及你想使用多少任务。

理论上您可以将所有内容都放在 1 个 python 运算符中,但这也不是很好的做法。