运行 基于 Web 请求的 Airflow 作业
Running Job On Airflow Based On Webrequest
我想知道是否可以在通过 HTTP 收到请求后执行气流任务。我对 Airflow 的调度部分不感兴趣。我只是想用它来代替芹菜。
所以一个示例操作应该是这样的。
- 用户提交了一个请求报告的表单。
- 后端收到请求并向用户发送请求已收到的通知。
- 然后后端立即使用 Airflow 安排作业 运行。
- Airflow 然后执行一系列与 DAG 关联的任务。比如先从redshift拉取数据,再从MySQL拉取数据,对两个结果集做一些操作,合并后将结果上传到Amazon S3,发邮件。[=25=]
根据我在网上阅读的内容,您可以通过在命令行上执行 airflow ...
来 运行 airflow 作业。我想知道是否有一个 python api 可以执行同样的事情。
谢谢。
您应该根据自己的需要查看 Airflow HTTP Sensor。您可以使用它来触发 dag。
Airflow REST API Plugin 会在这里帮助您。按照说明安装插件后,您只需点击以下 url:http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters}
,将 dag_id 替换为您的 dag 的 ID,或者省略 run_id或者指定一个唯一的 id,并为 conf 传递一个 url 编码的 json(在触发的 dag 中包含您需要的任何参数)。
这是一个示例 JavaScript 函数,它使用 jQuery 调用 Airflow api:
function triggerDag(dagId, dagParameters){
var urlEncodedParameters = encodeURIComponent(dagParameters);
var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters;
$.ajax({
url: dagRunUrl,
dataType: "json",
success: function(msg) {
console.log('Successfully started the dag');
},
error: function(e){
console.log('Failed to start the dag');
}
});
}
airflow 中的一个新选项是 实验性的 ,但在 1.7 和 1.8 的最新版本中是内置的 API 端点。这允许您 运行 气流服务器上的 REST 服务来侦听端口并接受 cli 作业。
我自己的经验有限,但我有 运行 次测试成功。根据文档:
/api/experimental/dags/<DAG_ID>/dag_runs
为给定的 dag id (POST) 创建一个 dag_run。
这将立即安排 运行 您想要 运行 的任何日期。不过,它仍然使用调度程序,等待心跳以查看 dag 运行ning 并将任务传递给 worker。不过,这与 CLI 的行为完全相同,所以我仍然相信它适合您的用例。
有关如何配置它的文档可在此处获得:https://airflow.apache.org/api.html
githubairflow/api/clients
下也有一些简单的示例客户端
Airflow 的实验性 REST API 接口可用于此目的。
以下请求将触发 DAG:
curl -X POST \
http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}'
以下请求检索特定 DAG ID 的 Dag 运行列表:
curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs
要使 GET API 正常工作,请将 rbac
标志设置为 True
,位于 airflow.cfg
。
更新: 稳定的 Airflow REST API 已发布:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
几乎所有内容都保持不变,除了 API URL 变化。
此外,“conf”现在需要成为一个对象,所以我添加了额外的包装:
def trigger_dag_v2(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/v1/dags/{}/dagRuns'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": {'conf': json.dumps(event)},
"execution_date": execution_date,
})
return data['message']
旧答案:
Airflow 具有 REST API(目前处于试验阶段)- 可在此处获取:
https://airflow.apache.org/api.html#endpoints
如果您不想按照其他答案中的建议安装插件 - 这是代码,您可以如何直接使用 API:
def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
return data['message']
在 python 中使用气流 API 的更多示例可在此处找到:
https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py
我在尝试做同样的事情时发现了这个 post,经过进一步调查,我切换到 ArgoEvents。它基本相同,但基于 event-driven 流程,因此它更适合此用例。
Link:
https://argoproj.github.io/argo
Airflow 现在支持 stable REST API。使用稳定的 REST API,您可以触发 DAG 为:
curl --location --request POST 'localhost:8080/api/v1/dags/unpublished/dagRuns' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--data-raw '{
"dag_run_id": "dag_run_1",
"conf": {
"key": "value"
}
}'
我想知道是否可以在通过 HTTP 收到请求后执行气流任务。我对 Airflow 的调度部分不感兴趣。我只是想用它来代替芹菜。
所以一个示例操作应该是这样的。
- 用户提交了一个请求报告的表单。
- 后端收到请求并向用户发送请求已收到的通知。
- 然后后端立即使用 Airflow 安排作业 运行。
- Airflow 然后执行一系列与 DAG 关联的任务。比如先从redshift拉取数据,再从MySQL拉取数据,对两个结果集做一些操作,合并后将结果上传到Amazon S3,发邮件。[=25=]
根据我在网上阅读的内容,您可以通过在命令行上执行 airflow ...
来 运行 airflow 作业。我想知道是否有一个 python api 可以执行同样的事情。
谢谢。
您应该根据自己的需要查看 Airflow HTTP Sensor。您可以使用它来触发 dag。
Airflow REST API Plugin 会在这里帮助您。按照说明安装插件后,您只需点击以下 url:http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters}
,将 dag_id 替换为您的 dag 的 ID,或者省略 run_id或者指定一个唯一的 id,并为 conf 传递一个 url 编码的 json(在触发的 dag 中包含您需要的任何参数)。
这是一个示例 JavaScript 函数,它使用 jQuery 调用 Airflow api:
function triggerDag(dagId, dagParameters){
var urlEncodedParameters = encodeURIComponent(dagParameters);
var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters;
$.ajax({
url: dagRunUrl,
dataType: "json",
success: function(msg) {
console.log('Successfully started the dag');
},
error: function(e){
console.log('Failed to start the dag');
}
});
}
airflow 中的一个新选项是 实验性的 ,但在 1.7 和 1.8 的最新版本中是内置的 API 端点。这允许您 运行 气流服务器上的 REST 服务来侦听端口并接受 cli 作业。
我自己的经验有限,但我有 运行 次测试成功。根据文档:
/api/experimental/dags/<DAG_ID>/dag_runs
为给定的 dag id (POST) 创建一个 dag_run。
这将立即安排 运行 您想要 运行 的任何日期。不过,它仍然使用调度程序,等待心跳以查看 dag 运行ning 并将任务传递给 worker。不过,这与 CLI 的行为完全相同,所以我仍然相信它适合您的用例。
有关如何配置它的文档可在此处获得:https://airflow.apache.org/api.html
githubairflow/api/clients
下也有一些简单的示例客户端Airflow 的实验性 REST API 接口可用于此目的。
以下请求将触发 DAG:
curl -X POST \
http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}'
以下请求检索特定 DAG ID 的 Dag 运行列表:
curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs
要使 GET API 正常工作,请将 rbac
标志设置为 True
,位于 airflow.cfg
。
更新: 稳定的 Airflow REST API 已发布: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
几乎所有内容都保持不变,除了 API URL 变化。 此外,“conf”现在需要成为一个对象,所以我添加了额外的包装:
def trigger_dag_v2(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/v1/dags/{}/dagRuns'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": {'conf': json.dumps(event)},
"execution_date": execution_date,
})
return data['message']
旧答案:
Airflow 具有 REST API(目前处于试验阶段)- 可在此处获取: https://airflow.apache.org/api.html#endpoints
如果您不想按照其他答案中的建议安装插件 - 这是代码,您可以如何直接使用 API:
def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
url = urljoin(self._api_base_url, endpoint)
data = self._request(url, method='POST',
json={
"run_id": run_id,
"conf": conf,
"execution_date": execution_date,
})
return data['message']
在 python 中使用气流 API 的更多示例可在此处找到: https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py
我在尝试做同样的事情时发现了这个 post,经过进一步调查,我切换到 ArgoEvents。它基本相同,但基于 event-driven 流程,因此它更适合此用例。 Link: https://argoproj.github.io/argo
Airflow 现在支持 stable REST API。使用稳定的 REST API,您可以触发 DAG 为:
curl --location --request POST 'localhost:8080/api/v1/dags/unpublished/dagRuns' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--data-raw '{
"dag_run_id": "dag_run_1",
"conf": {
"key": "value"
}
}'