在 python 雪花连接器中使用合并 pandas 数据框作为源

Using merge in python snowflake connector with pandas dataframe as a source

我正在从 API 中检索数据并将数据转换为 pandas 数据帧。我正在使用 python-snowflake 连接器将此数据作为 table.

发送到我的雪花模式中

我想使用合并而不是将重复数据发送到我的雪花 table。

我从 API 中检索的示例数据:

|------------|-------------|------------|
|  log_in_ID |   user_id   |   date     |
|------------|-------------|------------|
|    1       |     21      | 02/21/2021 |
|    2       |     22      | 02/24/2021 |
|    3       |     23      | 02/27/2021 |
|    4       |     21      | 02/29/2021 |
|------------|-------------|------------|

log_in_ID是独一无二的

这是我的代码:

import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine(URL(
                 account='my_snowflake_account',
                 user='user',
                 password='password',
                 database='my_database'
                 schema='my_schema',
                 warehouse='warehouse',
                 role='ADMIN'))

pandas_df = 'Some code to get API data and convert into pandas dataframe'

def send_to_snowflake(pandas_df, target_table):
    connect = engine.connect()
    data.tosql(target_table, con=engine, index=False, if_exists='append')
    connection.close()
    engine.dispose()

if __name__ == "__main__":
   send_to_snowflake(pandas_df, target_table)

如何使用合并语句将 log_in_id 作为唯一键?

如何在 snowflake-python 的合并查询中使用 pandas 数据框?

merge into target_table using {pandas_dataframe}
      on target_table.log_in_id = {pandas_dataframe}.log_in_id
      when matched then
           update set target_table.user_id = {pandas_dataframe}.user_id and
                  set target_table.date = {pandas_dataframe}.date

如果您的 API 结构类似于以下格式: [(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]

此代码可以将 API 数据合并到雪花目标 table 中,而无需将源数据加载到 tables:

import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL


def sample_func():
    engine = create_engine(URL(
                     account='xxx',
                     user='xxx',
                     password='xxx',
                     database='xxx',
                     schema='PUBLIC',
                     warehouse='COMPUTE_WH',
                     role='xxx',
    ))

    connection = engine.connect()
    pandas_df = 'select * from A'

    try:
        cursor_return = connection.execute(pandas_df)
        cursor_result = cursor_return.fetchall()
        api_data = str(cursor_result)[1:-1]
        print(api_data)
        merge_temp = """
        merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
      on target_table.log_in_id = src.COLUMN1
      when matched then
           update set target_table.log_in_id = src.COLUMN1,
                      target_table.user_id = src.COLUMN2,
                      target_table.test_data = src.COLUMN3
      when not matched then
      insert
      (log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
        """.format(str(api_data))
        print(merge_temp)
        c_return = connection.execute(merge_temp)
        c_result = c_return.fetchall()
        print(c_result)

        print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
    finally:
        connection.close()
        engine.dispose()

sample_func()

但我建议将您的 API 数据加载到临时 table 并在临时 table 上使用合并语句,这种方法比从数据帧或 csv 文件加载它更快.