Snowflake - 在 Airflow 中执行查询的输出

Snowflake - Execute the output of a query in Airflow

我有一个创建外键的查询 (foreign_keys.sql),输出是用于添加 FK 的 ALTER 语句行,但我如何执行这些语句?

示例行:

ALTER TABLE "EIV"."RTR"."LINEITEMS" 
ADD FOREIGN KEY (ITEM_ID) REFERENCES "EIV"."RTR"."ID_LINEITEMS" (ID);

下面如果我将如何 运行 在 Airflow 中执行此操作,那么我该如何执行这些语句?

snp_create_foreign_keys = SnowflakeQueryOperator(
        task_id='create_foreign_keys',
        sql='queries/foreign_keys.sql',
        params={
            'schema': 'qtr'
        },
        retries=0)

这就是我们的 SnowflakeQueryOperator 的样子:

class SnowflakeQueryOperator(BaseOperator):
    template_fields = ['sql', 'params']
    template_ext = ['.sql']

    @apply_defaults
    def __init__(self,
                 sql,
                 params=None,
                 warehouse=Variable.get('default_snowflake_warehouse'),
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.sql = sql
        self.params = params
        self.warehouse = warehouse

    def execute(self, context):
        sf_hook = SnowflakeHook(warehouse=self.warehouse)

        self.log.info(f'Running query:')

        sf_hook.execute_query(self.sql)

据我了解,Airflow 中的 SnowflakeOperator 不会 return select 查询的结果,它应该只用于在 Snowflake 上执行查询(像大多数数据库操作符一样)并且要么失败,要么成功。

您需要编写自己的运算符来执行此操作。