使用 Snowflake 和 Apache Airflow,如何使用 Sql Sensor 更改连接的数据库或架构?
Using Snowflake & Apache Airflow, how can I change DATABASE or SCHEMA for a connection using SqlSensor?
我是 Snowflake 的新手(并在 Apache Airflow 中使用它)。我正在尝试将 SqlSensor 与它一起使用。
我发现 Snowflake 驱动程序不允许多语句 SQL。 SnowflakeOperator 似乎通过拆分解决了这个问题;并一次执行一个语句。
Snowflake 是 SqlOperator 的有效连接类型。但是,我还没有注意到专门针对 Snowflake 的 SqlSensor。 SqlOperator 不会像 SnowflakeOperator 那样拆分命令。因此,似乎每个用于传感的查询都必须从一开始就在正确的数据库和模式上建立,即没有 USE DATABASE 类型的命令 - 否则它是多语句并且传感器失败。
我是否需要为我可能选择感测的每个模式和数据库建立单独的气流连接?或者是否有另一种方法可以在运行时使用 SqlSensor 指定数据库、架构等?
SqlSensor
旨在接受单个查询。
我假设您正在尝试 运行 查询,例如:
USE DATABASE my_database;
USE SCHEMA my_schema;
SELECT ...
这是行不通的。目前没有开箱即用的解决方案。 PR 正在通过公开允许传递参数的 SqlSensor
的底层挂钩来处理它。
但是您仍然可以使用以下方法之一解决您的问题:
- 定义另一个 Snowflake 连接,您不需要在查询中更改其属性,因此您将 运行 只有一个 SELECT 语句。
- 创建自定义
SnowflakeSensor
。
我没有测试过,但总体思路是这样的
from airflow.sensors.sql import SqlSensor
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
class SnowflakeSensor(SqlSensor):
def __init__(
self, **kwargs
):
self.account = kwargs.pop("account", None)
self.warehouse = kwargs.pop("warehouse", None)
self.database = kwargs.pop("database", None)
self.region = kwargs.pop("region", None)
self.role = kwargs.pop("role", None)
self.schema = kwargs.pop("schema", None)
self.authenticator = kwargs.pop("authenticator", None)
self.session_parameters = kwargs.pop("session_parameters", None)
super().__init__(**kwargs)
def _get_hook(self):
return SnowflakeHook(
snowflake_conn_id=self.conn_id,
warehouse=self.warehouse,
database=self.database,
role=self.role,
schema=self.schema,
authenticator=self.authenticator,
session_parameters=self.session_parameters,
)
此代码所做的是 SqlSensor
所做的一切,不同之处在于它还通过覆盖 _get_hook
函数接受 Snowflake 特定参数。
我是 Snowflake 的新手(并在 Apache Airflow 中使用它)。我正在尝试将 SqlSensor 与它一起使用。
我发现 Snowflake 驱动程序不允许多语句 SQL。 SnowflakeOperator 似乎通过拆分解决了这个问题;并一次执行一个语句。
Snowflake 是 SqlOperator 的有效连接类型。但是,我还没有注意到专门针对 Snowflake 的 SqlSensor。 SqlOperator 不会像 SnowflakeOperator 那样拆分命令。因此,似乎每个用于传感的查询都必须从一开始就在正确的数据库和模式上建立,即没有 USE DATABASE 类型的命令 - 否则它是多语句并且传感器失败。
我是否需要为我可能选择感测的每个模式和数据库建立单独的气流连接?或者是否有另一种方法可以在运行时使用 SqlSensor 指定数据库、架构等?
SqlSensor
旨在接受单个查询。
我假设您正在尝试 运行 查询,例如:
USE DATABASE my_database;
USE SCHEMA my_schema;
SELECT ...
这是行不通的。目前没有开箱即用的解决方案。 PR 正在通过公开允许传递参数的 SqlSensor
的底层挂钩来处理它。
但是您仍然可以使用以下方法之一解决您的问题:
- 定义另一个 Snowflake 连接,您不需要在查询中更改其属性,因此您将 运行 只有一个 SELECT 语句。
- 创建自定义
SnowflakeSensor
。
我没有测试过,但总体思路是这样的
from airflow.sensors.sql import SqlSensor
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
class SnowflakeSensor(SqlSensor):
def __init__(
self, **kwargs
):
self.account = kwargs.pop("account", None)
self.warehouse = kwargs.pop("warehouse", None)
self.database = kwargs.pop("database", None)
self.region = kwargs.pop("region", None)
self.role = kwargs.pop("role", None)
self.schema = kwargs.pop("schema", None)
self.authenticator = kwargs.pop("authenticator", None)
self.session_parameters = kwargs.pop("session_parameters", None)
super().__init__(**kwargs)
def _get_hook(self):
return SnowflakeHook(
snowflake_conn_id=self.conn_id,
warehouse=self.warehouse,
database=self.database,
role=self.role,
schema=self.schema,
authenticator=self.authenticator,
session_parameters=self.session_parameters,
)
此代码所做的是 SqlSensor
所做的一切,不同之处在于它还通过覆盖 _get_hook
函数接受 Snowflake 特定参数。