Python SnowflakeOperator 设置 snowflake_default
Python SnowflakeOperator setup snowflake_default
美好的一天,我找不到如何进行基本设置以airflow.contrib.operators.snowflake_operator.SnowflakeOperator
连接到雪花。 snowflake.connector.connect
工作正常。
当我使用 SnowflakeOperator
时:
op = snowflake_operator.SnowflakeOperator(sql = "create table test(*****)", task_id = '123')
我得到
airflow.exceptions.AirflowException: The conn_id
snowflake_defaultisn't defined
我尝试在后端 sqlite 数据库中插入
INSERT INTO connection(
conn_id, conn_type, host
, schema, login, password
, port, is_encrypted, is_extra_encrypted
) VALUES (*****)
但之后我得到一个错误:
snowflake.connector.errors.ProgrammingError: 251001: None: Account must be specified
。
将 account
kwarg 传递给 SnowflakeOperator
构造函数没有帮助。似乎我无法将帐户传递给数据库或构造函数,但这是必需的。
请帮助我,让我知道什么数据我应该插入后端本地数据库以便能够通过[=14连接=]
转到管理 -> 连接并像这样更新 snowflake_default
连接:
基于源代码 airflow/contrib/hooks/snowflake_hook.py:53
我们需要像这样添加额外内容:
{
"schema": "schema",
"database": "database",
"account": "account",
"warehouse": "warehouse"
}
在此背景下:
$ airflow version
2.2.3
$ pip install snowflake-connector-python==2.4.1
$ pip install apache-airflow-providers-snowflake==2.5.0
您必须像这样指定两次 Snowflake 帐户和 Snowflake 区域:
airflow connections add 'my_snowflake_db' \
--conn-type 'snowflake' \
--conn-login 'my_user' \
--conn-password 'my_password' \
--conn-port 443 \
--conn-schema 'public' \
--conn-host 'my_account_xyz.my_region_abc.snowflakecomputing.com' \
--conn-extra '{ "account": "my_account_xyz", "warehouse": "my_warehouse", "region": "my_region_abc" }'
否则它不会抛出 Python 异常:
snowflake.connector.errors.ProgrammingError: 251001: 251001: Account must be specified
我认为这可能是由于 airflow
命令参数 --conn-host
需要一个带有子域的完整域(my_account_xyz.my_region_abc
),通常将 Snowflake 指定为查询参数以类似于此模板的方式(尽管我没有检查命令 airflow connections add
和 DAG 执行的所有组合):
"snowflake://{user}:{password}@{account}{region}{cloud}/{database}/{schema}?role={role}&warehouse={warehouse}&timezone={timezone}"
然后像这样的虚拟 Snowflake DAG SELECT 1;
将找到通往 Snowflake 云服务的自己的方式并起作用:
import datetime
from datetime import timedelta
from airflow.models import DAG
# https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
my_dag = DAG(
"example_snowflake",
start_date=datetime.datetime.utcnow(),
default_args={"snowflake_conn_id": "my_snowflake_db"},
schedule_interval="0 0 1 * *",
tags=["example"],
catchup=False,
dagrun_timeout=timedelta(minutes=10),
)
sf_task_1 = SnowflakeOperator(
task_id="sf_task_1",
dag=my_dag,
sql="SELECT 1;",
)
美好的一天,我找不到如何进行基本设置以airflow.contrib.operators.snowflake_operator.SnowflakeOperator
连接到雪花。 snowflake.connector.connect
工作正常。
当我使用 SnowflakeOperator
时:
op = snowflake_operator.SnowflakeOperator(sql = "create table test(*****)", task_id = '123')
我得到
airflow.exceptions.AirflowException: The conn_id
snowflake_defaultisn't defined
我尝试在后端 sqlite 数据库中插入
INSERT INTO connection(
conn_id, conn_type, host
, schema, login, password
, port, is_encrypted, is_extra_encrypted
) VALUES (*****)
但之后我得到一个错误:
snowflake.connector.errors.ProgrammingError: 251001: None: Account must be specified
。
将 account
kwarg 传递给 SnowflakeOperator
构造函数没有帮助。似乎我无法将帐户传递给数据库或构造函数,但这是必需的。
请帮助我,让我知道什么数据我应该插入后端本地数据库以便能够通过[=14连接=]
转到管理 -> 连接并像这样更新 snowflake_default
连接:
基于源代码 airflow/contrib/hooks/snowflake_hook.py:53
我们需要像这样添加额外内容:
{
"schema": "schema",
"database": "database",
"account": "account",
"warehouse": "warehouse"
}
在此背景下:
$ airflow version
2.2.3
$ pip install snowflake-connector-python==2.4.1
$ pip install apache-airflow-providers-snowflake==2.5.0
您必须像这样指定两次 Snowflake 帐户和 Snowflake 区域:
airflow connections add 'my_snowflake_db' \
--conn-type 'snowflake' \
--conn-login 'my_user' \
--conn-password 'my_password' \
--conn-port 443 \
--conn-schema 'public' \
--conn-host 'my_account_xyz.my_region_abc.snowflakecomputing.com' \
--conn-extra '{ "account": "my_account_xyz", "warehouse": "my_warehouse", "region": "my_region_abc" }'
否则它不会抛出 Python 异常:
snowflake.connector.errors.ProgrammingError: 251001: 251001: Account must be specified
我认为这可能是由于 airflow
命令参数 --conn-host
需要一个带有子域的完整域(my_account_xyz.my_region_abc
),通常将 Snowflake 指定为查询参数以类似于此模板的方式(尽管我没有检查命令 airflow connections add
和 DAG 执行的所有组合):
"snowflake://{user}:{password}@{account}{region}{cloud}/{database}/{schema}?role={role}&warehouse={warehouse}&timezone={timezone}"
然后像这样的虚拟 Snowflake DAG SELECT 1;
将找到通往 Snowflake 云服务的自己的方式并起作用:
import datetime
from datetime import timedelta
from airflow.models import DAG
# https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
my_dag = DAG(
"example_snowflake",
start_date=datetime.datetime.utcnow(),
default_args={"snowflake_conn_id": "my_snowflake_db"},
schedule_interval="0 0 1 * *",
tags=["example"],
catchup=False,
dagrun_timeout=timedelta(minutes=10),
)
sf_task_1 = SnowflakeOperator(
task_id="sf_task_1",
dag=my_dag,
sql="SELECT 1;",
)