Python 和 Snowflake 使用 SQL 将新数据附加到现有 table 中 Alchemy 引擎正在返回当前会话没有当前数据库

Python and Snowflake appending new data into an existing table using SQL Alchemy engine is returning current session does not have a current database

我需要将一些新数据附加到雪花上的现有 table 中。我使用 sqlalchemy 作为引擎以及 pandas 数据框 to_sql()。这是导入和脚本:

import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
import snowflake.sqlalchemy
from snowflake.connector.pandas_tools import pd_writer
from sqlalchemy.ext.declarative import declarative_base

registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')

columns_df = pd.DataFrame(data.columns.to_list(), columns={'survey_column_name'})
                        columns_df['survey_id'] = nextval
                        columns_df = columns_df[['survey_id', 'survey_column_name']]
                        columns_df.to_sql('SURVEY_METADATA_COLUMN_NAMES', 
                                         index = False,  
                                         index_label = None, 
                                         con = engine, 
                                         schema = 'PUBLIC', 
                                         if_exists = 'append', 
                                         chunksize = 300,
                                         method = pd_writer)

我得到的错误如下:

ProgrammingError: (snowflake.connector.errors.ProgrammingError) 090105 (22000): Cannot perform CREATE

TABLE. This session does not have a current database. Call 'USE DATABASE', or use a qualified name. [SQL:

CREATE TABLE "PUBLIC"."SURVEY_METADATA_COLUMN_NAMES" (

survey_id INTEGER,

survey_column_name TEXT )

]

连接如下:

user = input('Your Snowflake username: ')
password = getpass.getpass('Your Snowflake Password: ')
account = 'MY_ACCOUNT'
conn = snowCtx.connect(
    user=user,
    password=password,
    account=account,
    database='MY_DB',
    schema='PUBLIC',
    warehouse='COMPUTE_WH',
    role='SYSADMIN'
)

engine = create_engine(
    'snowflake://{user}:{password}@{account}/'.format(
        user=user,
        password=password,
        account=account,
        database='MY_DB',
        schema = 'PUBLIC',
        warehouse='COMPUTE_WH',
        role='SYSADMIN',
        cache_column_metadata=True
    )
)

我改为使用 write_pandas()

success, nchunks, nrows, _ = write_pandas(conn, 
                                          columns_df, 
                                          'SURVEY_METADATA_COLUMN_NAMES', 
                                          chunk_size = 300, 
                                          schema = 'PUBLIC')
                        print(success, nchunks, nrows)
if(success):
   print(filename+' columns uploaded')
else:
   print(filename+' columns were not uploaded')

它需要 pyarrow 库,所以我安装了它:

pip install pyarrow

我删除了与 sqlalchemy 相关的所有导入并保留了以下内容:

import pandas as pd
import os
import snowflake.connector as snowCtx
import getpass
import json
import numpy as np
from datetime import date, datetime
import time
from snowflake.connector.pandas_tools import write_pandas