在气流中的雪花运算符中传递雪花连接参数
pass snowflake connection parameters in snowflake operator in airflow
我们正在使用 AWS 管理的 apache airflow 2.0.2。我正在通过 dag 运行 配置传递雪花模式名称。我可以在 python 运算符中使用此架构名称,但不能在雪花运算符中使用。
下面是我的代码
def check_for_null_op(**kwargs):
snowflake_schema_name = kwargs["database_schema"]
print("printing schema name")
print(snowflake_schema_name)
dwh_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN,schema=snowflake_schema_name)
result = dwh_hook.get_first("CALL test_proc()")
print("printing results")
print(result[0])
/*some more code*/
check_for_null_op = PythonOperator(
task_id="check_for_null_op",
python_callable=check_for_null_op,
op_kwargs={
"database_schema": "{{ dag_run.conf['database_schema'] }}",
}
)
test_sp = SnowflakeOperator(
task_id="test_sp",
sql="CALL usp_check_for_null_op()",
snowflake_conn_id=SNOWFLAKE_CONN,
schema="{{ dag_run.conf['database_schema'] }}"
)
PythonOperator 步骤正常,但 SnowflakeOperator 步骤失败。
下面是 SNOWFLAKE_CONN
的示例值
snowflake://username:password@abcd.us-east-1.snowflakecomputing.com/?account=abcd&warehouse=WHS&database=DBS®ion=us-east-1&role=ADD
在 test_sp 任务中遇到错误
SQL compilation error:Unknown function USP_CHECK_FOR_NULL_OP
但是如果我传递硬编码值来代替 {{ dag_run.conf['database_schema'] }}
,代码就可以正常工作
如何在 SnowflakeOperator 中使用雪花模式名称?
对于 SnowflakeOperator
,schema
参数不是模板化字段——只有 sql
是——参见 here。运算符正在将 schema
的 Jinja 表达式读取为文字字符串“{{ dag_run.conf['database_schema'] }}”。
但是,像这样的东西应该可以工作:
test_sp = SnowflakeOperator(
task_id="test_sp",
sql="CALL {{ dag_run.conf['database_schema'] }}.usp_check_for_null_op()",
snowflake_conn_id=SNOWFLAKE_CONN,
)
我们正在使用 AWS 管理的 apache airflow 2.0.2。我正在通过 dag 运行 配置传递雪花模式名称。我可以在 python 运算符中使用此架构名称,但不能在雪花运算符中使用。
下面是我的代码
def check_for_null_op(**kwargs):
snowflake_schema_name = kwargs["database_schema"]
print("printing schema name")
print(snowflake_schema_name)
dwh_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN,schema=snowflake_schema_name)
result = dwh_hook.get_first("CALL test_proc()")
print("printing results")
print(result[0])
/*some more code*/
check_for_null_op = PythonOperator(
task_id="check_for_null_op",
python_callable=check_for_null_op,
op_kwargs={
"database_schema": "{{ dag_run.conf['database_schema'] }}",
}
)
test_sp = SnowflakeOperator(
task_id="test_sp",
sql="CALL usp_check_for_null_op()",
snowflake_conn_id=SNOWFLAKE_CONN,
schema="{{ dag_run.conf['database_schema'] }}"
)
PythonOperator 步骤正常,但 SnowflakeOperator 步骤失败。 下面是 SNOWFLAKE_CONN
的示例值snowflake://username:password@abcd.us-east-1.snowflakecomputing.com/?account=abcd&warehouse=WHS&database=DBS®ion=us-east-1&role=ADD
在 test_sp 任务中遇到错误
SQL compilation error:Unknown function USP_CHECK_FOR_NULL_OP
但是如果我传递硬编码值来代替 {{ dag_run.conf['database_schema'] }}
如何在 SnowflakeOperator 中使用雪花模式名称?
对于 SnowflakeOperator
,schema
参数不是模板化字段——只有 sql
是——参见 here。运算符正在将 schema
的 Jinja 表达式读取为文字字符串“{{ dag_run.conf['database_schema'] }}”。
但是,像这样的东西应该可以工作:
test_sp = SnowflakeOperator(
task_id="test_sp",
sql="CALL {{ dag_run.conf['database_schema'] }}.usp_check_for_null_op()",
snowflake_conn_id=SNOWFLAKE_CONN,
)