URL 的分页气流循环
Airflow loop through pagination for URL
我创建了一个 DAG,用于从 API 获取 JSON 数据。从 API 获取数据后,它将数据上传到 S3。来自 API 的数据是分页的,我一次只能得到 100 条记录。根据创建的数据,我可能需要循环几次才能获取数据。我只知道从 API.
中选取第一个数据后要循环多少页
所以我的任务是
initialize_env = DummyOperator(task_id='initialize_env')
get_data = SimpleHttpOperator(task_id='get_data',
url=url,
endpoint='users?page={page_id}') #this task loops number of times
upload_to_s3 = MyOperator(task_id='upload_to_s3') #this task loops number of times
read_data = PythonOperator(task_id='upload_to_s3',
python_callable=function_reading_data) #this task should stop the loop
send_email = DummyOperator(task_id='send_email') #send email at the end of looping
我想了解如何在 Airflow 中进行循环以便浏览所有页面?
循环任务在 Airflow 中不是一个好的做法(如果可能的话)。
我建议使用 Python 运算符来为您执行循环。在 python 运算符中创建循环以点击 API 并上传到 S3。或者,您可以遍历并在 get_data 中创建一个页面列表,通过 XCOM 传递该列表以上传,然后再次传递相同的列表以读取数据。
取决于你想投入多少 python 以及你想使用多少任务。
理论上您可以将所有内容都放在 1 个 python 运算符中,但这也不是很好的做法。
我创建了一个 DAG,用于从 API 获取 JSON 数据。从 API 获取数据后,它将数据上传到 S3。来自 API 的数据是分页的,我一次只能得到 100 条记录。根据创建的数据,我可能需要循环几次才能获取数据。我只知道从 API.
中选取第一个数据后要循环多少页所以我的任务是
initialize_env = DummyOperator(task_id='initialize_env')
get_data = SimpleHttpOperator(task_id='get_data',
url=url,
endpoint='users?page={page_id}') #this task loops number of times
upload_to_s3 = MyOperator(task_id='upload_to_s3') #this task loops number of times
read_data = PythonOperator(task_id='upload_to_s3',
python_callable=function_reading_data) #this task should stop the loop
send_email = DummyOperator(task_id='send_email') #send email at the end of looping
我想了解如何在 Airflow 中进行循环以便浏览所有页面?
循环任务在 Airflow 中不是一个好的做法(如果可能的话)。
我建议使用 Python 运算符来为您执行循环。在 python 运算符中创建循环以点击 API 并上传到 S3。或者,您可以遍历并在 get_data 中创建一个页面列表,通过 XCOM 传递该列表以上传,然后再次传递相同的列表以读取数据。
取决于你想投入多少 python 以及你想使用多少任务。
理论上您可以将所有内容都放在 1 个 python 运算符中,但这也不是很好的做法。