存储和检索腌制的 python 个对象 to/from 雪花
Store and retrieve pickled python objects to/from snowflake
根据问题,我正在尝试将挑选的 python 对象存储到雪花中,以便日后再次取回它们。对此的帮助将不胜感激:
雪花table定义:
CREATE OR REPLACE TABLE <db>.<schema>.TESTING_MEMORY (
MODEL_DATETIME DATETIME,
SCALARS VARIANT
;
Python代码:
import numpy as np
import pandas as pd
import pickle
from datetime import datetime
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from sklearn.preprocessing import StandardScaler
def create_snowflake_connection():
conn = snowflake.connector.connect(
user='<username>',
account='<account>',
password = '<password>',
warehouse='<wh>',
database='<db>',
role='<role>',
schema='<schema>'
)
return conn
memory = {}
np.random.seed(78)
df = pd.DataFrame({
'x1': np.random.normal(0, 2, 10000),
'x2': np.random.normal(5, 3, 10000),
'x3': np.random.normal(-5, 5, 10000)
})
scaler = StandardScaler()
scaler.fit(df)
scaled_df = scaler.transform(df)
scaled_df = pd.DataFrame(scaled_df, columns=['x1', 'x2', 'x3'])
memory['SCALARS'] = pickle.dumps(scaler)
ctx = create_snowflake_connection()
# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
# retreive from snowflake
cur = ctx.cursor()
sql = """
SELECT hex_encode(SCALARS)
FROM <db>.<schema>.TESTING_MEMORY
QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1
"""
cur.execute(sql)
returned = cur.fetch_pandas_all()
cur.close()
ctx.close()
可能有更好的方法来做到这一点(免责声明:我是 Python 的新手),但这似乎有效并且基于此处的答案:
更改sqltable定义
CREATE OR REPLACE TABLE db.schema.TESTING_MEMORY (
MODEL_DATETIME DATETIME,
SCALARS VARCHAR
);
2 对 Python 代码的更改 - 一般
import base64
3 对 Python 代码的更改(写入上面的雪花部分)
# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
pickled_columns = ['SCALARS']
for column in pickled_columns:
b64_bytes = base64.b64encode(db_dat[column].values[0])
db_dat[column] = b64_bytes.decode('utf8')
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
对 Python 代码的更改 - 从雪花中检索
cur = ctx.cursor()
sql = """
SELECT *
FROM db.schema.TESTING_MEMORY
QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1
"""
cur.execute(sql)
returned = cur.fetch_pandas_all()
for column in pickled_columns:
returned[column] = base64.b64decode(returned[column].values[0])
new_dict = returned.to_dict('list')
for key,val in new_dict.items():
new_dict[key] = val[0]
您似乎在尝试将 python byte
对象放入 Snowflake variant
中,但这对您不起作用。
这个答案有点类似于此处建议的其他答案,除了不使用 varchar
字段存储 base64 编码的二进制文件,而是使用 binary
类型。 base64 编码比我在某处读到的二进制编码大 30%。
创建二进制数据类型的table:
create or replace table testdb.public.test_table (obj binary);
十六进制编码 pickled 对象,写入,读回并调用方法:
import pickle
import snowflake.connector
# This is the object we're going to store in Snowflake as binary
class PickleMe:
def __init__(self, first_name, last_name):
self.first_name = first_name
self.last_name = last_name
def say_hello(self):
print(f'Hi there, {self.first_name} {self.last_name}')
# Create the object and store it as hex in the 'hex_person' variable
person = PickleMe('John', 'Doe')
hex_person = pickle.dumps(person).hex()
with snowflake.connector.connect(
user="username",
password="password",
account="snowflake_account_deets",
warehouse="warehouse_name",
) as con:
# Write pickled object into table as binary
con.cursor().execute(f"INSERT INTO testdb.public.test_table values(to_binary('{hex_person}', 'HEX'))")
# Now get the object back and put it into the 'obj' variable
(obj,) = con.cursor().execute(f"select obj from testdb.public.test_table").fetchone()
# Deserialise object and call method on it
person_obj = pickle.loads(obj, encoding='HEX')
person_obj.say_hello()
上面的输出是
Hi there, John Doe
根据问题,我正在尝试将挑选的 python 对象存储到雪花中,以便日后再次取回它们。对此的帮助将不胜感激:
雪花table定义:
CREATE OR REPLACE TABLE <db>.<schema>.TESTING_MEMORY (
MODEL_DATETIME DATETIME,
SCALARS VARIANT
;
Python代码:
import numpy as np
import pandas as pd
import pickle
from datetime import datetime
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from sklearn.preprocessing import StandardScaler
def create_snowflake_connection():
conn = snowflake.connector.connect(
user='<username>',
account='<account>',
password = '<password>',
warehouse='<wh>',
database='<db>',
role='<role>',
schema='<schema>'
)
return conn
memory = {}
np.random.seed(78)
df = pd.DataFrame({
'x1': np.random.normal(0, 2, 10000),
'x2': np.random.normal(5, 3, 10000),
'x3': np.random.normal(-5, 5, 10000)
})
scaler = StandardScaler()
scaler.fit(df)
scaled_df = scaler.transform(df)
scaled_df = pd.DataFrame(scaled_df, columns=['x1', 'x2', 'x3'])
memory['SCALARS'] = pickle.dumps(scaler)
ctx = create_snowflake_connection()
# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
# retreive from snowflake
cur = ctx.cursor()
sql = """
SELECT hex_encode(SCALARS)
FROM <db>.<schema>.TESTING_MEMORY
QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1
"""
cur.execute(sql)
returned = cur.fetch_pandas_all()
cur.close()
ctx.close()
可能有更好的方法来做到这一点(免责声明:我是 Python 的新手),但这似乎有效并且基于此处的答案:
更改sqltable定义
CREATE OR REPLACE TABLE db.schema.TESTING_MEMORY ( MODEL_DATETIME DATETIME, SCALARS VARCHAR );
2 对 Python 代码的更改 - 一般
import base64
3 对 Python 代码的更改(写入上面的雪花部分)
# Write to snowflake
db_dat = pd.DataFrame([list(memory.values())], columns=list(memory.keys()))
db_dat.insert(0, 'MODEL_DATETIME', datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"))
pickled_columns = ['SCALARS']
for column in pickled_columns:
b64_bytes = base64.b64encode(db_dat[column].values[0])
db_dat[column] = b64_bytes.decode('utf8')
success, nchunks, nrows, _ = write_pandas(conn=ctx, df = db_dat, table_name = 'TESTING_MEMORY')
对 Python 代码的更改 - 从雪花中检索
cur = ctx.cursor() sql = """ SELECT * FROM db.schema.TESTING_MEMORY QUALIFY ROW_NUMBER() OVER (ORDER BY MODEL_DATETIME DESC) = 1 """ cur.execute(sql) returned = cur.fetch_pandas_all() for column in pickled_columns: returned[column] = base64.b64decode(returned[column].values[0]) new_dict = returned.to_dict('list') for key,val in new_dict.items(): new_dict[key] = val[0]
您似乎在尝试将 python byte
对象放入 Snowflake variant
中,但这对您不起作用。
这个答案有点类似于此处建议的其他答案,除了不使用 varchar
字段存储 base64 编码的二进制文件,而是使用 binary
类型。 base64 编码比我在某处读到的二进制编码大 30%。
创建二进制数据类型的table:
create or replace table testdb.public.test_table (obj binary);
十六进制编码 pickled 对象,写入,读回并调用方法:
import pickle
import snowflake.connector
# This is the object we're going to store in Snowflake as binary
class PickleMe:
def __init__(self, first_name, last_name):
self.first_name = first_name
self.last_name = last_name
def say_hello(self):
print(f'Hi there, {self.first_name} {self.last_name}')
# Create the object and store it as hex in the 'hex_person' variable
person = PickleMe('John', 'Doe')
hex_person = pickle.dumps(person).hex()
with snowflake.connector.connect(
user="username",
password="password",
account="snowflake_account_deets",
warehouse="warehouse_name",
) as con:
# Write pickled object into table as binary
con.cursor().execute(f"INSERT INTO testdb.public.test_table values(to_binary('{hex_person}', 'HEX'))")
# Now get the object back and put it into the 'obj' variable
(obj,) = con.cursor().execute(f"select obj from testdb.public.test_table").fetchone()
# Deserialise object and call method on it
person_obj = pickle.loads(obj, encoding='HEX')
person_obj.say_hello()
上面的输出是
Hi there, John Doe