在 Airflow DAG 中创建动态数据库连接

Create a dynamic database connection in Airflow DAG

我正在使用 Apache-Airflow 2.2.3,我知道我们可以通过 admin/connections 创建连接。但我正在尝试一种使用动态数据库服务器详细信息创建连接的方法。

我的数据库主机、用户、密码详细信息来自 DAGRun 输入配置,我需要读取数据并将其写入数据库。

您可以从 DAGRun 配置中读取连接详细信息:

# Say we gave input {"username": "foo", "password": "bar"}

from airflow.models.connection import Connection

def mytask(**context):
    username = context["dag_run"].conf["username"]
    password = context["dag_run"].conf["password"]
    connection = Connection(login=username, password=password)

但是,Airflow 中的所有运算符(需要连接)都采用一个参数 conn_id,该参数采用一个字符串来标识 metastore/env var/secrets 后端中的连接。目前无法提供连接对象。

因此,如果您实现自己的 Python 函数(并使用 PythonOperator 或 @task 装饰器)或实现自己的运算符,您应该能够创建连接对象并使用它执行任何逻辑。但是无法使用 Airflow 中的任何其他现有运算符。