来自不可靠网络的 Airflow HTTP 调用
Airflow HTTP call from unreliable network
我需要通过 HTTP get on apache airflow 从 REST API 获取数据(例如到 https://something.com/api/data)。
数据来自具有以下结构的页面:
{
"meta" : {
"size" : 50,
"currentPage" : 3,
"totalPage" : 10
},
"data" : [
....
]
}
问题是,API 提供商不可靠。有时我们会收到 504 网关超时。所以我必须重试 API 调用,直到当前页面 = 总页数,如果出现 504 网关超时则重试。但是整个重试过程不能超过15分钟。
有什么方法可以使用 apache airflow 实现这一点?
谢谢
您可以使用 HTTP Operator from HTTP providers package。查看这些链接中的示例和指南。
如果您还没有,请先安装提供程序包:
pip install apache-airflow-providers-http
然后您可以尝试将请求发送到 https://httpbin.org。为此,请创建如下连接:
您可以使用 SimpleHttpOperator
:
创建任务
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(
'example_http_operator',
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
start_date=datetime(2021, 10, 9),
) as dag:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='get',
data={"param1": "value1", "param2": "value2"},
headers={},
)
默认情况下,此运算符在后台对获得的响应执行 raise_for_status。因此,如果响应 status_code
不在 1xx 或 2xx 的范围内,将引发异常并且任务将被标记为 failed
。如果您想自定义此行为,您可以提供自己的 response_check
作为 SimpleHttpOperator
的参数
:param response_check: A check against the 'requests' response object.
The callable takes the response object as the first positional argumentand optionally any number of keyword arguments available in the context dictionary.
It should return True for 'pass' and False otherwise.
:type response_check: A lambda or defined function.
最后,为了根据需要处理失败的重试,您可以在 Airflow (docs) 的任何 Operator 中使用以下可用参数:
retries (int) -- the number of retries that should be performed before failing the task
retry_delay (datetime.timedelta) -- delay between retries
retry_exponential_backoff (bool) -- allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
max_retry_delay (datetime.timedelta) -- maximum delay interval between retries
最后,要尝试一切如何协同工作,请对端点执行请求,该端点将使用特定的错误状态代码进行回答:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='status/400', # response stus code will be 400
data={"param1": "value1", "param2": "value2"},
headers={},
)
让我知道这是否适合您!
我需要通过 HTTP get on apache airflow 从 REST API 获取数据(例如到 https://something.com/api/data)。 数据来自具有以下结构的页面:
{
"meta" : {
"size" : 50,
"currentPage" : 3,
"totalPage" : 10
},
"data" : [
....
]
}
问题是,API 提供商不可靠。有时我们会收到 504 网关超时。所以我必须重试 API 调用,直到当前页面 = 总页数,如果出现 504 网关超时则重试。但是整个重试过程不能超过15分钟。
有什么方法可以使用 apache airflow 实现这一点?
谢谢
您可以使用 HTTP Operator from HTTP providers package。查看这些链接中的示例和指南。
如果您还没有,请先安装提供程序包:
pip install apache-airflow-providers-http
然后您可以尝试将请求发送到 https://httpbin.org。为此,请创建如下连接:
您可以使用 SimpleHttpOperator
:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(
'example_http_operator',
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
start_date=datetime(2021, 10, 9),
) as dag:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='get',
data={"param1": "value1", "param2": "value2"},
headers={},
)
默认情况下,此运算符在后台对获得的响应执行 raise_for_status。因此,如果响应 status_code
不在 1xx 或 2xx 的范围内,将引发异常并且任务将被标记为 failed
。如果您想自定义此行为,您可以提供自己的 response_check
作为 SimpleHttpOperator
:param response_check: A check against the 'requests' response object. The callable takes the response object as the first positional argumentand optionally any number of keyword arguments available in the context dictionary. It should return True for 'pass' and False otherwise. :type response_check: A lambda or defined function.
最后,为了根据需要处理失败的重试,您可以在 Airflow (docs) 的任何 Operator 中使用以下可用参数:
retries (int) -- the number of retries that should be performed before failing the task
retry_delay (datetime.timedelta) -- delay between retries
retry_exponential_backoff (bool) -- allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
max_retry_delay (datetime.timedelta) -- maximum delay interval between retries
最后,要尝试一切如何协同工作,请对端点执行请求,该端点将使用特定的错误状态代码进行回答:
task_get_op = SimpleHttpOperator(
task_id='get_op',
method='GET',
endpoint='status/400', # response stus code will be 400
data={"param1": "value1", "param2": "value2"},
headers={},
)
让我知道这是否适合您!