在 Airflow DAG 中创建动态数据库连接
Create a dynamic database connection in Airflow DAG
我正在使用 Apache-Airflow 2.2.3,我知道我们可以通过 admin/connections 创建连接。但我正在尝试一种使用动态数据库服务器详细信息创建连接的方法。
我的数据库主机、用户、密码详细信息来自 DAGRun 输入配置,我需要读取数据并将其写入数据库。
您可以从 DAGRun 配置中读取连接详细信息:
# Say we gave input {"username": "foo", "password": "bar"}
from airflow.models.connection import Connection
def mytask(**context):
username = context["dag_run"].conf["username"]
password = context["dag_run"].conf["password"]
connection = Connection(login=username, password=password)
但是,Airflow 中的所有运算符(需要连接)都采用一个参数 conn_id
,该参数采用一个字符串来标识 metastore/env var/secrets 后端中的连接。目前无法提供连接对象。
因此,如果您实现自己的 Python 函数(并使用 PythonOperator 或 @task
装饰器)或实现自己的运算符,您应该能够创建连接对象并使用它执行任何逻辑。但是无法使用 Airflow 中的任何其他现有运算符。
我正在使用 Apache-Airflow 2.2.3,我知道我们可以通过 admin/connections 创建连接。但我正在尝试一种使用动态数据库服务器详细信息创建连接的方法。
我的数据库主机、用户、密码详细信息来自 DAGRun 输入配置,我需要读取数据并将其写入数据库。
您可以从 DAGRun 配置中读取连接详细信息:
# Say we gave input {"username": "foo", "password": "bar"}
from airflow.models.connection import Connection
def mytask(**context):
username = context["dag_run"].conf["username"]
password = context["dag_run"].conf["password"]
connection = Connection(login=username, password=password)
但是,Airflow 中的所有运算符(需要连接)都采用一个参数 conn_id
,该参数采用一个字符串来标识 metastore/env var/secrets 后端中的连接。目前无法提供连接对象。
因此,如果您实现自己的 Python 函数(并使用 PythonOperator 或 @task
装饰器)或实现自己的运算符,您应该能够创建连接对象并使用它执行任何逻辑。但是无法使用 Airflow 中的任何其他现有运算符。