在气流中的雪花运算符中传递雪花连接参数

pass snowflake connection parameters in snowflake operator in airflow

我们正在使用 AWS 管理的 apache airflow 2.0.2。我正在通过 dag 运行 配置传递雪花模式名称。我可以在 python 运算符中使用此架构名称,但不能在雪花运算符中使用。

下面是我的代码

def check_for_null_op(**kwargs):
    snowflake_schema_name = kwargs["database_schema"]
    print("printing schema name")
    print(snowflake_schema_name)
    dwh_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN,schema=snowflake_schema_name)
    result = dwh_hook.get_first("CALL test_proc()")
    print("printing results")
    print(result[0])
        /*some more code*/


check_for_null_op = PythonOperator(
    task_id="check_for_null_op",
    python_callable=check_for_null_op,
    op_kwargs={
        "database_schema": "{{ dag_run.conf['database_schema'] }}",
    }
)

test_sp = SnowflakeOperator(
    task_id="test_sp",
    sql="CALL usp_check_for_null_op()",
    snowflake_conn_id=SNOWFLAKE_CONN,
    schema="{{ dag_run.conf['database_schema'] }}"
)

PythonOperator 步骤正常,但 SnowflakeOperator 步骤失败。 下面是 SNOWFLAKE_CONN

的示例值
snowflake://username:password@abcd.us-east-1.snowflakecomputing.com/?account=abcd&warehouse=WHS&database=DBS&region=us-east-1&role=ADD

在 test_sp 任务中遇到错误

SQL compilation error:Unknown function USP_CHECK_FOR_NULL_OP

但是如果我传递硬编码值来代替 {{ dag_run.conf['database_schema'] }}

,代码就可以正常工作

如何在 SnowflakeOperator 中使用雪花模式名称?

对于 SnowflakeOperatorschema 参数不是模板化字段——只有 sql 是——参见 here。运算符正在将 schema 的 Jinja 表达式读取为文字字符串“{{ dag_run.conf['database_schema'] }}”。

但是,像这样的东西应该可以工作:

test_sp = SnowflakeOperator(
    task_id="test_sp",
    sql="CALL {{ dag_run.conf['database_schema'] }}.usp_check_for_null_op()",
    snowflake_conn_id=SNOWFLAKE_CONN,
)