存储和检索腌制的 python 个对象 to/from 雪花

Store and retrieve pickled python objects to/from snowflake

根据问题,我正在尝试将挑选的 python 对象存储到雪花中,以便日后再次取回它们。对此的帮助将不胜感激:

雪花table定义:

CREATE OR REPLACE TABLE <db>.<schema>.TESTING_MEMORY (
    MODEL_DATETIME DATETIME,
    SCALARS VARIANT
;

Python代码:

import numpy as np
import pandas as pd
import pickle
from datetime import datetime
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from sklearn.preprocessing import StandardScaler

def create_snowflake_connection():
    conn = snowflake.connector.connect(
        user='<username>',
        account='<account>',
        password = '<password>',
        warehouse='<wh>',
        database='<db>',
        role='<role>',
        schema='<schema>'
    )
    
    return conn

memory = {}

np.random.seed(78)
df = pd.DataFrame({
    'x1': np.random.normal(0, 2, 10000),
    'x2': np.random.normal(5, 3, 10000),
    'x3': np.random.normal(-5, 5, 10000)
})

scaler = StandardScaler()
scaler.fit(df)

scaled_df = scaler.transform(df)
scaled_df = pd.DataFrame(scaled_df, columns=['x1', 'x2', 'x3'])


memory['SCALARS'] = pickle.dumps(scaler)
    

ctx = create_snowflake_connection()


# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
  
# retreive from snowflake
cur = ctx.cursor()
sql = """
        SELECT hex_encode(SCALARS)
        FROM <db>.<schema>.TESTING_MEMORY
        QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1
"""
cur.execute(sql) 
returned = cur.fetch_pandas_all() 


cur.close()
ctx.close()

可能有更好的方法来做到这一点(免责声明:我是 Python 的新手),但这似乎有效并且基于此处的答案:

  1. 更改sqltable定义

    CREATE OR REPLACE TABLE db.schema.TESTING_MEMORY (
    MODEL_DATETIME DATETIME,
    SCALARS VARCHAR
    );
    

2 对 Python 代码的更改 - 一般

import base64

3 对 Python 代码的更改(写入上面的雪花部分)

# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))

pickled_columns = ['SCALARS']
for column in pickled_columns:
    b64_bytes = base64.b64encode(db_dat[column].values[0])  
    db_dat[column] = b64_bytes.decode('utf8')
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
  1. 对 Python 代码的更改 - 从雪花中检索

    cur = ctx.cursor() 
    
    sql = """
         SELECT *
         FROM db.schema.TESTING_MEMORY
         QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1
    """
    
    cur.execute(sql) 
    returned = cur.fetch_pandas_all() 
    
    for column in pickled_columns:
        returned[column] =  base64.b64decode(returned[column].values[0])
    
    
    new_dict = returned.to_dict('list')
    for key,val in new_dict.items():
        new_dict[key] = val[0]
    

您似乎在尝试将 python byte 对象放入 Snowflake variant 中,但这对您不起作用。

这个答案有点类似于此处建议的其他答案,除了不使用 varchar 字段存储 base64 编码的二进制文件,而是使用 binary 类型。 base64 编码比我在某处读到的二进制编码大 30%。

创建二进制数据类型的table:

create or replace table testdb.public.test_table (obj binary);

十六进制编码 pickled 对象,写入,读回并调用方法:

import pickle
import snowflake.connector

# This is the object we're going to store in Snowflake as binary
class PickleMe:
    def __init__(self, first_name, last_name):
        self.first_name = first_name
        self.last_name = last_name

    def say_hello(self):
        print(f'Hi there, {self.first_name} {self.last_name}')

# Create the object and store it as hex in the 'hex_person' variable
person = PickleMe('John', 'Doe')
hex_person = pickle.dumps(person).hex()

with snowflake.connector.connect(
    user="username",
    password="password",
    account="snowflake_account_deets",
    warehouse="warehouse_name",
) as con:
    # Write pickled object into table as binary
    con.cursor().execute(f"INSERT INTO testdb.public.test_table values(to_binary('{hex_person}', 'HEX'))")

    # Now get the object back and put it into the 'obj' variable
    (obj,) = con.cursor().execute(f"select obj from testdb.public.test_table").fetchone()

    # Deserialise object and call method on it
    person_obj = pickle.loads(obj, encoding='HEX')
    person_obj.say_hello()

上面的输出是

Hi there, John Doe