Python SnowflakeOperator 设置 snowflake_default

Python SnowflakeOperator setup snowflake_default

美好的一天,我找不到如何进行基本设置以airflow.contrib.operators.snowflake_operator.SnowflakeOperator连接到雪花。 snowflake.connector.connect 工作正常。

当我使用 SnowflakeOperator 时:

op = snowflake_operator.SnowflakeOperator(sql = "create table test(*****)", task_id = '123')

我得到

airflow.exceptions.AirflowException: The conn_idsnowflake_defaultisn't defined

我尝试在后端 sqlite 数据库中插入

INSERT INTO connection( conn_id, conn_type, host , schema, login, password , port, is_encrypted, is_extra_encrypted ) VALUES (*****)

但之后我得到一个错误:

snowflake.connector.errors.ProgrammingError: 251001: None: Account must be specified

account kwarg 传递给 SnowflakeOperator 构造函数没有帮助。似乎我无法将帐户传递给数据库或构造函数,但这是必需的。

请帮助我,让我知道什么数据我应该插入后端本地数据库以便能够通过[=14连接=]

转到管理 -> 连接并像这样更新 snowflake_default 连接:

基于源代码 airflow/contrib/hooks/snowflake_hook.py:53 我们需要像这样添加额外内容:

{
    "schema": "schema",
    "database": "database",
    "account": "account",
    "warehouse": "warehouse"
}

在此背景下:

$ airflow version
2.2.3
$ pip install snowflake-connector-python==2.4.1
$ pip install apache-airflow-providers-snowflake==2.5.0

您必须像这样指定两次 Snowflake 帐户和 Snowflake 区域:

airflow connections add 'my_snowflake_db' \
    --conn-type 'snowflake' \
    --conn-login 'my_user' \
    --conn-password 'my_password' \
    --conn-port 443 \
    --conn-schema 'public' \
    --conn-host 'my_account_xyz.my_region_abc.snowflakecomputing.com' \
    --conn-extra '{ "account": "my_account_xyz", "warehouse": "my_warehouse", "region": "my_region_abc" }'

否则它不会抛出 Python 异常:

snowflake.connector.errors.ProgrammingError: 251001: 251001: Account must be specified

我认为这可能是由于 airflow 命令参数 --conn-host 需要一个带有子域的完整域(my_account_xyz.my_region_abc),通常将 Snowflake 指定为查询参数以类似于此模板的方式(尽管我没有检查命令 airflow connections add 和 DAG 执行的所有组合):

"snowflake://{user}:{password}@{account}{region}{cloud}/{database}/{schema}?role={role}&warehouse={warehouse}&timezone={timezone}"

然后像这样的虚拟 Snowflake DAG SELECT 1; 将找到通往 Snowflake 云服务的自己的方式并起作用:

import datetime
from datetime import timedelta

from airflow.models import DAG

# https://airflow.apache.org/docs/apache-airflow-providers-snowflake/stable/operators/snowflake.html
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

my_dag = DAG(
    "example_snowflake",
    start_date=datetime.datetime.utcnow(),
    default_args={"snowflake_conn_id": "my_snowflake_db"},
    schedule_interval="0 0 1 * *",
    tags=["example"],
    catchup=False,
    dagrun_timeout=timedelta(minutes=10),
)

sf_task_1 = SnowflakeOperator(
    task_id="sf_task_1",
    dag=my_dag,
    sql="SELECT 1;",
)