如何在 Airflow MsSqlOperator 中使用 mssql_conn_id?
How to use mssql_conn_id in Airflow MsSqlOperator?
我正在尝试在我的 Airflow 工作流程中使用 MsSqlOperator,但我不知道如何设置连接字符串。
我试过将 mssql_conn_id 设置为连接字符串本身
t2 = MsSqlOperator(
task_id='sql-op',
mssql_conn_id='sa:password@172.17.0.2',
sql='use results; insert into airflow value("airflow","out")',
dag=dag)
我收到错误
airflow.exceptions.AirflowException: The conn_id `sa:password@172.17.0.2` isn't defined
所以我想 mssql_conn_id 需要定义。某处。有什么想法吗?
我可以像这样使用 sqlalchemy 连接到 MS SQL 数据库:
params = urllib.quote_plus("DRIVER={ODBC Driver 13 for SQL Server};SERVER=172.17.0.2;UID=SA;PWD=password")
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
conn = engine.connect()
所以我知道服务器已启动并且 运行。
mssql_conn_id
参数指的是气流数据库中的 connection 条目,而不是实际的连接 URI。
您有几个添加连接的选项:
- UI:在管理 -> 连接下
- 命令行:使用
airflow connections --add --conn-id my_mssql --conn_uri mssql+pyodbc://sa:password@172.17.0.2
- 环境变量:设置
AIRFLOW_CONN_MY_MSSQL=mssql+pyodbc://sa:password@172.17.0.2
然后只需在运算符中引用conn_id
:
t2 = MsSqlOperator(
task_id='sql-op',
mssql_conn_id='my_mssql',
sql='use results; insert into airflow value("airflow","out")',
dag=dag)
代码完成
进入 admin > Connection
并编辑 mssql_default
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
default_arg = {'owner': 'airflow'
,'start_date': '2022-01-01'
}
dag = DAG(
'name_task',
default_args=default_arg,
description='description here',
schedule_interval=None,
catchup=False
)
task = MsSqlOperator(
task_id='task_test',
mssql_conn_id='mssql_default',
sql=f"create table abc (a int)",
autocommit=True,
database='DatabaseGilmar',
dag=dag
)
我正在尝试在我的 Airflow 工作流程中使用 MsSqlOperator,但我不知道如何设置连接字符串。
我试过将 mssql_conn_id 设置为连接字符串本身
t2 = MsSqlOperator(
task_id='sql-op',
mssql_conn_id='sa:password@172.17.0.2',
sql='use results; insert into airflow value("airflow","out")',
dag=dag)
我收到错误
airflow.exceptions.AirflowException: The conn_id `sa:password@172.17.0.2` isn't defined
所以我想 mssql_conn_id 需要定义。某处。有什么想法吗?
我可以像这样使用 sqlalchemy 连接到 MS SQL 数据库:
params = urllib.quote_plus("DRIVER={ODBC Driver 13 for SQL Server};SERVER=172.17.0.2;UID=SA;PWD=password")
engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)
conn = engine.connect()
所以我知道服务器已启动并且 运行。
mssql_conn_id
参数指的是气流数据库中的 connection 条目,而不是实际的连接 URI。
您有几个添加连接的选项:
- UI:在管理 -> 连接下
- 命令行:使用
airflow connections --add --conn-id my_mssql --conn_uri mssql+pyodbc://sa:password@172.17.0.2
- 环境变量:设置
AIRFLOW_CONN_MY_MSSQL=mssql+pyodbc://sa:password@172.17.0.2
然后只需在运算符中引用conn_id
:
t2 = MsSqlOperator(
task_id='sql-op',
mssql_conn_id='my_mssql',
sql='use results; insert into airflow value("airflow","out")',
dag=dag)
代码完成
进入 admin > Connection 并编辑 mssql_default
from airflow import DAG
from airflow.operators.mssql_operator import MsSqlOperator
default_arg = {'owner': 'airflow'
,'start_date': '2022-01-01'
}
dag = DAG(
'name_task',
default_args=default_arg,
description='description here',
schedule_interval=None,
catchup=False
)
task = MsSqlOperator(
task_id='task_test',
mssql_conn_id='mssql_default',
sql=f"create table abc (a int)",
autocommit=True,
database='DatabaseGilmar',
dag=dag
)