在 python 雪花连接器中使用合并 pandas 数据框作为源
Using merge in python snowflake connector with pandas dataframe as a source
我正在从 API 中检索数据并将数据转换为 pandas 数据帧。我正在使用 python-snowflake 连接器将此数据作为 table.
发送到我的雪花模式中
我想使用合并而不是将重复数据发送到我的雪花 table。
我从 API 中检索的示例数据:
|------------|-------------|------------|
| log_in_ID | user_id | date |
|------------|-------------|------------|
| 1 | 21 | 02/21/2021 |
| 2 | 22 | 02/24/2021 |
| 3 | 23 | 02/27/2021 |
| 4 | 21 | 02/29/2021 |
|------------|-------------|------------|
log_in_ID是独一无二的
这是我的代码:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(URL(
account='my_snowflake_account',
user='user',
password='password',
database='my_database'
schema='my_schema',
warehouse='warehouse',
role='ADMIN'))
pandas_df = 'Some code to get API data and convert into pandas dataframe'
def send_to_snowflake(pandas_df, target_table):
connect = engine.connect()
data.tosql(target_table, con=engine, index=False, if_exists='append')
connection.close()
engine.dispose()
if __name__ == "__main__":
send_to_snowflake(pandas_df, target_table)
如何使用合并语句将 log_in_id 作为唯一键?
如何在 snowflake-python 的合并查询中使用 pandas 数据框?
merge into target_table using {pandas_dataframe}
on target_table.log_in_id = {pandas_dataframe}.log_in_id
when matched then
update set target_table.user_id = {pandas_dataframe}.user_id and
set target_table.date = {pandas_dataframe}.date
如果您的 API 结构类似于以下格式:
[(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]
此代码可以将 API 数据合并到雪花目标 table 中,而无需将源数据加载到 tables:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
def sample_func():
engine = create_engine(URL(
account='xxx',
user='xxx',
password='xxx',
database='xxx',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='xxx',
))
connection = engine.connect()
pandas_df = 'select * from A'
try:
cursor_return = connection.execute(pandas_df)
cursor_result = cursor_return.fetchall()
api_data = str(cursor_result)[1:-1]
print(api_data)
merge_temp = """
merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
on target_table.log_in_id = src.COLUMN1
when matched then
update set target_table.log_in_id = src.COLUMN1,
target_table.user_id = src.COLUMN2,
target_table.test_data = src.COLUMN3
when not matched then
insert
(log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
""".format(str(api_data))
print(merge_temp)
c_return = connection.execute(merge_temp)
c_result = c_return.fetchall()
print(c_result)
print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
finally:
connection.close()
engine.dispose()
sample_func()
但我建议将您的 API 数据加载到临时 table 并在临时 table 上使用合并语句,这种方法比从数据帧或 csv 文件加载它更快.
我正在从 API 中检索数据并将数据转换为 pandas 数据帧。我正在使用 python-snowflake 连接器将此数据作为 table.
发送到我的雪花模式中我想使用合并而不是将重复数据发送到我的雪花 table。
我从 API 中检索的示例数据:
|------------|-------------|------------|
| log_in_ID | user_id | date |
|------------|-------------|------------|
| 1 | 21 | 02/21/2021 |
| 2 | 22 | 02/24/2021 |
| 3 | 23 | 02/27/2021 |
| 4 | 21 | 02/29/2021 |
|------------|-------------|------------|
log_in_ID是独一无二的
这是我的代码:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(URL(
account='my_snowflake_account',
user='user',
password='password',
database='my_database'
schema='my_schema',
warehouse='warehouse',
role='ADMIN'))
pandas_df = 'Some code to get API data and convert into pandas dataframe'
def send_to_snowflake(pandas_df, target_table):
connect = engine.connect()
data.tosql(target_table, con=engine, index=False, if_exists='append')
connection.close()
engine.dispose()
if __name__ == "__main__":
send_to_snowflake(pandas_df, target_table)
如何使用合并语句将 log_in_id 作为唯一键?
如何在 snowflake-python 的合并查询中使用 pandas 数据框?
merge into target_table using {pandas_dataframe}
on target_table.log_in_id = {pandas_dataframe}.log_in_id
when matched then
update set target_table.user_id = {pandas_dataframe}.user_id and
set target_table.date = {pandas_dataframe}.date
如果您的 API 结构类似于以下格式: [(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]
此代码可以将 API 数据合并到雪花目标 table 中,而无需将源数据加载到 tables:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
def sample_func():
engine = create_engine(URL(
account='xxx',
user='xxx',
password='xxx',
database='xxx',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='xxx',
))
connection = engine.connect()
pandas_df = 'select * from A'
try:
cursor_return = connection.execute(pandas_df)
cursor_result = cursor_return.fetchall()
api_data = str(cursor_result)[1:-1]
print(api_data)
merge_temp = """
merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
on target_table.log_in_id = src.COLUMN1
when matched then
update set target_table.log_in_id = src.COLUMN1,
target_table.user_id = src.COLUMN2,
target_table.test_data = src.COLUMN3
when not matched then
insert
(log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
""".format(str(api_data))
print(merge_temp)
c_return = connection.execute(merge_temp)
c_result = c_return.fetchall()
print(c_result)
print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
finally:
connection.close()
engine.dispose()
sample_func()
但我建议将您的 API 数据加载到临时 table 并在临时 table 上使用合并语句,这种方法比从数据帧或 csv 文件加载它更快.