运行 基于 Web 请求的 Airflow 作业

Running Job On Airflow Based On Webrequest

我想知道是否可以在通过 HTTP 收到请求后执行气流任务。我对 Airflow 的调度部分不感兴趣。我只是想用它来代替芹菜。

所以一个示例操作应该是这样的。

  1. 用户提交了一个请求报告的表单。
  2. 后端收到请求并向用户发送请求已收到的通知。
  3. 然后后端立即使用 Airflow 安排作业 运行。
  4. Airflow 然后执行一系列与 DAG 关联的任务。比如先从redshift拉取数据,再从MySQL拉取数据,对两个结果集做一些操作,合并后将结果上传到Amazon S3,发邮件。[​​=25=]

根据我在网上阅读的内容,您可以通过在命令行上执行 airflow ... 来 运行 airflow 作业。我想知道是否有一个 python api 可以执行同样的事情。

谢谢。

您应该根据自己的需要查看 Airflow HTTP Sensor。您可以使用它来触发 dag。

Airflow REST API Plugin 会在这里帮助您。按照说明安装插件后,您只需点击以下 url:http://{HOST}:{PORT}/admin/rest_api/api/v1.0/trigger_dag?dag_id={dag_id}&run_id={run_id}&conf={url_encoded_json_parameters},将 dag_id 替换为您的 dag 的 ID,或者省略 run_id或者指定一个唯一的 id,并为 conf 传递一个 url 编码的 json(在触发的 dag 中包含您需要的任何参数)。

这是一个示例 JavaScript 函数,它使用 jQuery 调用 Airflow api:

function triggerDag(dagId, dagParameters){
    var urlEncodedParameters = encodeURIComponent(dagParameters);
    var dagRunUrl = "http://airflow:8080/admin/rest_api/api/v1.0/trigger_dag?dag_id="+dagId+"&conf="+urlEncodedParameters;
    $.ajax({
        url: dagRunUrl,
        dataType: "json",
        success: function(msg) {
            console.log('Successfully started the dag');
        },
        error: function(e){
           console.log('Failed to start the dag');
        }
    });
}

airflow 中的一个新选项是 实验性的 ,但在 1.7 和 1.8 的最新版本中是内置的 API 端点。这允许您 运行 气流服务器上的 REST 服务来侦听端口并接受 cli 作业。

我自己的经验有限,但我有 运行 次测试成功。根据文档:

/api/experimental/dags/<DAG_ID>/dag_runs 为给定的 dag id (POST) 创建一个 dag_run。

这将立即安排 运行 您想要 运行 的任何日期。不过,它仍然使用调度程序,等待心跳以查看 dag 运行ning 并将任务传递给 worker。不过,这与 CLI 的行为完全相同,所以我仍然相信它适合您的用例。

有关如何配置它的文档可在此处获得:https://airflow.apache.org/api.html

githubairflow/api/clients

下也有一些简单的示例客户端

Airflow 的实验性 REST API 接口可用于此目的。

以下请求将触发 DAG:

curl -X POST \
    http://<HOST>:8080/api/experimental/dags/process_data/dag_runs \
    -H 'Cache-Control: no-cache' \
    -H 'Content-Type: application/json' \
    -d '{"conf":"{\"START_DATE\":\"2018-06-01 03:00:00\", \"STOP_DATE\":\"2018-06-01 23:00:00\"}'

以下请求检索特定 DAG ID 的 Dag 运行列表:

curl -i -H "Accept: application/json" -H "Content-Type: application/json" -X GET http://<HOST>:8080/api/experimental/dags/process_data/dag_runs

要使 GET API 正常工作,请将 rbac 标志设置为 True,位于 airflow.cfg

以下是可用的 API 的列表:here & there

更新: 稳定的 Airflow REST API 已发布: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

几乎所有内容都保持不变,除了 API URL 变化。 此外,“conf”现在需要成为一个对象,所以我添加了额外的包装:

 def trigger_dag_v2(self, dag_id, run_id=None, conf=None, execution_date=None):
    endpoint = '/api/v1/dags/{}/dagRuns'.format(dag_id)
    url = urljoin(self._api_base_url, endpoint)
    data = self._request(url, method='POST',
                         json={
                             "run_id": run_id,
                             "conf": {'conf': json.dumps(event)},
                             "execution_date": execution_date,
                         })
    return data['message']

旧答案:

Airflow 具有 REST API(目前处于试验阶段)- 可在此处获取: https://airflow.apache.org/api.html#endpoints

如果您不想按照其他答案中的建议安装插件 - 这是代码,您可以如何直接使用 API:

def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None):
    endpoint = '/api/experimental/dags/{}/dag_runs'.format(dag_id)
    url = urljoin(self._api_base_url, endpoint)
    data = self._request(url, method='POST',
                         json={
                             "run_id": run_id,
                             "conf": conf,
                             "execution_date": execution_date,
                         })
    return data['message']

在 python 中使用气流 API 的更多示例可在此处找到: https://github.com/apache/airflow/blob/master/airflow/api/client/json_client.py

我在尝试做同样的事情时发现了这个 post,经过进一步调查,我切换到 ArgoEvents。它基本相同,但基于 event-driven 流程,因此它更适合此用例。 Link: https://argoproj.github.io/argo

Airflow 现在支持 stable REST API。使用稳定的 REST API,您可以触发 DAG 为:

curl --location --request POST 'localhost:8080/api/v1/dags/unpublished/dagRuns' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \
--data-raw '{
    "dag_run_id": "dag_run_1",
    "conf": {
        "key": "value"
    }
}'