Airflow 中外部连接的连接池
Connection pooling for external connections in Airflow
我正在尝试为 Airflow 中创建的外部连接寻找一种连接池管理方法。
气流版本:2.1.0
Python版本:3.9.5
气流数据库:SQLite
创建的外部连接:MySQL 和 Snowflake
我知道 airflow.cfg 文件中有属性
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
但这些属性用于管理气流内部数据库,在我的例子中是 SQLite。
我很少有任务在 MySQL 和 Snowflake 中读取或写入数据。
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
和
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
正在从 MySQL
中读取数据
def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
我观察到的是为 Snowflake 的每个任务(同一个 dag 中有多个任务)创建了一个新会话,尚未为 MySQL.[=14= 验证相同]
有没有办法维护外部连接的连接池(在我的例子中是 Snowflake 和 MySQL)或任何其他方法 运行 同一会话中同一 DAG 中的所有查询?
谢谢
Airflow 提供使用 Pools 作为限制外部服务并发的方法。
您可以通过 UI 创建池:
菜单 -> 管理 -> 池
或 CLI :
airflow pools set NAME slots
池中的槽定义了可以运行并行使用资源的任务数量。如果池已满,任务将排队,直到打开插槽。
要在运算符中使用池,只需将 pool=Name
添加到运算符即可。
在您的情况下,假设 Pool
是使用名称 snowflake 创建的:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
pool='snowflake',
)
请注意,默认情况下,任务占用池中的 1 个槽位,但这是可配置的。如果使用 pool_slots
示例:
,则任务可能占用多个槽
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
...
pool='snowflake',
pool_slots=2,
)
我正在尝试为 Airflow 中创建的外部连接寻找一种连接池管理方法。
气流版本:2.1.0
Python版本:3.9.5
气流数据库:SQLite
创建的外部连接:MySQL 和 Snowflake
我知道 airflow.cfg 文件中有属性
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
但这些属性用于管理气流内部数据库,在我的例子中是 SQLite。
我很少有任务在 MySQL 和 Snowflake 中读取或写入数据。
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
和
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
正在从 MySQL
中读取数据def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
我观察到的是为 Snowflake 的每个任务(同一个 dag 中有多个任务)创建了一个新会话,尚未为 MySQL.[=14= 验证相同]
有没有办法维护外部连接的连接池(在我的例子中是 Snowflake 和 MySQL)或任何其他方法 运行 同一会话中同一 DAG 中的所有查询?
谢谢
Airflow 提供使用 Pools 作为限制外部服务并发的方法。
您可以通过 UI 创建池: 菜单 -> 管理 -> 池
或 CLI :
airflow pools set NAME slots
池中的槽定义了可以运行并行使用资源的任务数量。如果池已满,任务将排队,直到打开插槽。
要在运算符中使用池,只需将 pool=Name
添加到运算符即可。
在您的情况下,假设 Pool
是使用名称 snowflake 创建的:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
pool='snowflake',
)
请注意,默认情况下,任务占用池中的 1 个槽位,但这是可配置的。如果使用 pool_slots
示例:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
...
pool='snowflake',
pool_slots=2,
)