如何使用 AWS Lambda 禁用 Airflow DAG
How to disable Airflow DAGs with AWS Lambda
我需要使用 AWS Lambda 或其他方式禁用 Airflow DAG。我可以使用 python 代码来执行此操作吗?提前谢谢你。
您可以 pause/unpause 具有 Airflow Rest API 的 DAG
相关端点是 update a DAG.
https://airflow.apache.org/api/v1/dags/{dag_id}
有:
{
"is_paused": true
}
您还有 Airflow 官方 python client,您可以使用它与 API 进行交互。示例:
import time
import airflow_client.client
from airflow_client.client.api import dag_api
from airflow_client.client.model.dag import DAG
from airflow_client.client.model.error import Error
from pprint import pprint
configuration = client.Configuration(
host = "http://localhost/api/v1"
)
# Configure HTTP basic authorization: Basic
configuration = client.Configuration(
username = 'YOUR_USERNAME',
password = 'YOUR_PASSWORD'
)
with client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = dag_api.DAGApi(api_client)
dag_id = "dag_id_example" # str | The DAG ID.
dag = DAG(
is_paused=True,
)
try:
# Update a DAG
api_response = api_instance.patch_dag(dag_id, dag)
pprint(api_response)
except client.ApiException as e:
print("Exception when calling DAGApi->patch_dag: %s\n" % e)
你可以在客户端看到完整的例子doc。
我需要使用 AWS Lambda 或其他方式禁用 Airflow DAG。我可以使用 python 代码来执行此操作吗?提前谢谢你。
您可以 pause/unpause 具有 Airflow Rest API 的 DAG 相关端点是 update a DAG.
https://airflow.apache.org/api/v1/dags/{dag_id}
有:
{
"is_paused": true
}
您还有 Airflow 官方 python client,您可以使用它与 API 进行交互。示例:
import time
import airflow_client.client
from airflow_client.client.api import dag_api
from airflow_client.client.model.dag import DAG
from airflow_client.client.model.error import Error
from pprint import pprint
configuration = client.Configuration(
host = "http://localhost/api/v1"
)
# Configure HTTP basic authorization: Basic
configuration = client.Configuration(
username = 'YOUR_USERNAME',
password = 'YOUR_PASSWORD'
)
with client.ApiClient(configuration) as api_client:
# Create an instance of the API class
api_instance = dag_api.DAGApi(api_client)
dag_id = "dag_id_example" # str | The DAG ID.
dag = DAG(
is_paused=True,
)
try:
# Update a DAG
api_response = api_instance.patch_dag(dag_id, dag)
pprint(api_response)
except client.ApiException as e:
print("Exception when calling DAGApi->patch_dag: %s\n" % e)
你可以在客户端看到完整的例子doc。