Python 和 Snowflake 使用 SQL 将新数据附加到现有 table 中 Alchemy 引擎正在返回当前会话没有当前数据库
Python and Snowflake appending new data into an existing table using SQL Alchemy engine is returning current session does not have a current database
我需要将一些新数据附加到雪花上的现有 table 中。我使用 sqlalchemy
作为引擎以及 pandas 数据框 to_sql()
。这是导入和脚本:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
import snowflake.sqlalchemy
from snowflake.connector.pandas_tools import pd_writer
from sqlalchemy.ext.declarative import declarative_base
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')
columns_df = pd.DataFrame(data.columns.to_list(), columns={'survey_column_name'})
columns_df['survey_id'] = nextval
columns_df = columns_df[['survey_id', 'survey_column_name']]
columns_df.to_sql('SURVEY_METADATA_COLUMN_NAMES',
index = False,
index_label = None,
con = engine,
schema = 'PUBLIC',
if_exists = 'append',
chunksize = 300,
method = pd_writer)
我得到的错误如下:
ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090105
(22000): Cannot perform CREATE
TABLE. This session does not have a current database. Call 'USE
DATABASE', or use a qualified name. [SQL:
CREATE TABLE "PUBLIC"."SURVEY_METADATA_COLUMN_NAMES" (
survey_id INTEGER,
survey_column_name TEXT )
]
连接如下:
user = input('Your Snowflake username: ')
password = getpass.getpass('Your Snowflake Password: ')
account = 'MY_ACCOUNT'
conn = snowCtx.connect(
user=user,
password=password,
account=account,
database='MY_DB',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN'
)
engine = create_engine(
'snowflake://{user}:{password}@{account}/'.format(
user=user,
password=password,
account=account,
database='MY_DB',
schema = 'PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN',
cache_column_metadata=True
)
)
我改为使用 write_pandas()
:
success, nchunks, nrows, _ = write_pandas(conn,
columns_df,
'SURVEY_METADATA_COLUMN_NAMES',
chunk_size = 300,
schema = 'PUBLIC')
print(success, nchunks, nrows)
if(success):
print(filename+' columns uploaded')
else:
print(filename+' columns were not uploaded')
它需要 pyarrow
库,所以我安装了它:
pip install pyarrow
我删除了与 sqlalchemy
相关的所有导入并保留了以下内容:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from snowflake.connector.pandas_tools import write_pandas
我需要将一些新数据附加到雪花上的现有 table 中。我使用 sqlalchemy
作为引擎以及 pandas 数据框 to_sql()
。这是导入和脚本:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
import snowflake.sqlalchemy
from snowflake.connector.pandas_tools import pd_writer
from sqlalchemy.ext.declarative import declarative_base
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')
columns_df = pd.DataFrame(data.columns.to_list(), columns={'survey_column_name'})
columns_df['survey_id'] = nextval
columns_df = columns_df[['survey_id', 'survey_column_name']]
columns_df.to_sql('SURVEY_METADATA_COLUMN_NAMES',
index = False,
index_label = None,
con = engine,
schema = 'PUBLIC',
if_exists = 'append',
chunksize = 300,
method = pd_writer)
我得到的错误如下:
ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090105 (22000): Cannot perform CREATE
TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name. [SQL:
CREATE TABLE "PUBLIC"."SURVEY_METADATA_COLUMN_NAMES" (
survey_id INTEGER,
survey_column_name TEXT )
]
连接如下:
user = input('Your Snowflake username: ')
password = getpass.getpass('Your Snowflake Password: ')
account = 'MY_ACCOUNT'
conn = snowCtx.connect(
user=user,
password=password,
account=account,
database='MY_DB',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN'
)
engine = create_engine(
'snowflake://{user}:{password}@{account}/'.format(
user=user,
password=password,
account=account,
database='MY_DB',
schema = 'PUBLIC',
warehouse='COMPUTE_WH',
role='SYSADMIN',
cache_column_metadata=True
)
)
我改为使用 write_pandas()
:
success, nchunks, nrows, _ = write_pandas(conn,
columns_df,
'SURVEY_METADATA_COLUMN_NAMES',
chunk_size = 300,
schema = 'PUBLIC')
print(success, nchunks, nrows)
if(success):
print(filename+' columns uploaded')
else:
print(filename+' columns were not uploaded')
它需要 pyarrow
库,所以我安装了它:
pip install pyarrow
我删除了与 sqlalchemy
相关的所有导入并保留了以下内容:
import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from snowflake.connector.pandas_tools import write_pandas