Snowflake - 在 Airflow 中执行查询的输出
Snowflake - Execute the output of a query in Airflow
我有一个创建外键的查询 (foreign_keys.sql
),输出是用于添加 FK 的 ALTER 语句行,但我如何执行这些语句?
示例行:
ALTER TABLE "EIV"."RTR"."LINEITEMS"
ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);
下面如果我将如何 运行 在 Airflow 中执行此操作,那么我该如何执行这些语句?
snp_create_foreign_keys = SnowflakeQueryOperator(
task_id='create_foreign_keys',
sql='queries/foreign_keys.sql',
params={
'schema': 'qtr'
},
retries=0)
这就是我们的 SnowflakeQueryOperator 的样子:
class SnowflakeQueryOperator(BaseOperator):
template_fields = ['sql', 'params']
template_ext = ['.sql']
@apply_defaults
def __init__(self,
sql,
params=None,
warehouse=Variable.get('default_snowflake_warehouse'),
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.sql = sql
self.params = params
self.warehouse = warehouse
def execute(self, context):
sf_hook = SnowflakeHook(warehouse=self.warehouse)
self.log.info(f'Running query:')
sf_hook.execute_query(self.sql)
据我了解,Airflow 中的 SnowflakeOperator 不会 return select 查询的结果,它应该只用于在 Snowflake 上执行查询(像大多数数据库操作符一样)并且要么失败,要么成功。
您需要编写自己的运算符来执行此操作。
我有一个创建外键的查询 (foreign_keys.sql
),输出是用于添加 FK 的 ALTER 语句行,但我如何执行这些语句?
示例行:
ALTER TABLE "EIV"."RTR"."LINEITEMS"
ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);
下面如果我将如何 运行 在 Airflow 中执行此操作,那么我该如何执行这些语句?
snp_create_foreign_keys = SnowflakeQueryOperator(
task_id='create_foreign_keys',
sql='queries/foreign_keys.sql',
params={
'schema': 'qtr'
},
retries=0)
这就是我们的 SnowflakeQueryOperator 的样子:
class SnowflakeQueryOperator(BaseOperator):
template_fields = ['sql', 'params']
template_ext = ['.sql']
@apply_defaults
def __init__(self,
sql,
params=None,
warehouse=Variable.get('default_snowflake_warehouse'),
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.sql = sql
self.params = params
self.warehouse = warehouse
def execute(self, context):
sf_hook = SnowflakeHook(warehouse=self.warehouse)
self.log.info(f'Running query:')
sf_hook.execute_query(self.sql)
据我了解,Airflow 中的 SnowflakeOperator 不会 return select 查询的结果,它应该只用于在 Snowflake 上执行查询(像大多数数据库操作符一样)并且要么失败,要么成功。
您需要编写自己的运算符来执行此操作。