使用 python 对 postgres 执行 upsert 操作,例如 pandas to_sql 函数
perform upsert operation on postgres like pandas to_sql function using python
在问这个问题之前,我已经阅读了很多关于在 Postgres 上进行 UPSERT
操作的链接:
- PostgreSQL Upsert Using INSERT ON CONFLICT statement
- Anyway to Upsert database using PostgreSQL in Python
但是问题和他们不一样,因为功能不一样。我想要的是实现类似 pandas to_sql
的功能,它具有以下功能:
- 自动创建table
- 保留每列的数据类型
to_sql
的唯一缺点是它不能 UPSERT
在 Postgres 上运行。是否有通过将数据帧传递给它来实现预期功能(自动创建 table 基于列,执行 UPSERT 操作并保留数据类型)?
之前使用 Pandas to_sql 函数实现的代码:
class PostgreSQL:
def __init__(self):
postgres_config = config_dict[Consts.POSTGRES.value]
self.host = postgres_config[Consts.HOST.value]
self.port = postgres_config[Consts.PORT.value]
self.db_name = postgres_config[Consts.DB_NAME.value]
self.username = postgres_config[Consts.USERNAME.value]
self.password = postgres_config[Consts.PASSWORD.value]
def get_connection(self) -> object:
url_schema = Consts.POSTGRES_URL_SCHEMA.value.format(
self.username, self.password, self.host, self.port, self.db_name
)
try:
engine = create_engine(url_schema)
return engine
except Exception as e:
logger.error('Make sure you have provided correct credentials for the DB connection.')
raise e
def save_df_to_db(self, df: object, table_name: str) -> None:
df.to_sql(table_name, con=self.get_connection(), if_exists='append')
我写了一个非常通用的代码来执行 UPSERT
,Postgres 官方不支持(直到 2021 年 12 月),使用 Pandas 数据帧并以高效的方式。
通过使用以下代码,它将更新现有的主键,否则它将创建一个新的 table(以防 table 名称不存在)并将新记录添加到 table.
代码:
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, Table
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
class PostgreSQL:
def __init__(self):
postgres_config = config_dict[Consts.POSTGRES.value]
self.host = postgres_config[Consts.HOST.value]
self.port = postgres_config[Consts.PORT.value]
self.db_name = postgres_config[Consts.DB_NAME.value]
self.username = postgres_config[Consts.USERNAME.value]
self.password = postgres_config[Consts.PASSWORD.value]
def get_connection(self) -> object:
url_schema = 'postgresql://{}:{}@{}:{}/{}'.format(
self.username, self.password, self.host, self.port, self.db_name
)
try:
engine = create_engine(url_schema)
return engine
except Exception as e:
logger.error('Make sure you have provided correct credentials for the DB connection.')
raise e
def run_query(self, query: str) -> list:
engine = self.get_connection()
return engine.execute(query).fetchall()
def save_df_to_db(self, df: object, table_name: str) -> None:
root_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
engine = self.get_connection()
add_primary_key_query = get_query(root_dir, Directories.COMMON.value, 'add_primary_key.sql', table_name)
table_existence_query = get_query(root_dir, Directories.COMMON.value, 'table_existence.sql', table_name)
if not engine.execute(table_existence_query).first()[0]: # if table does not exist
logger.info('Create table automatically and from scratch!')
df.to_sql(table_name, con=self.get_connection(), if_exists='append')
engine.execute(add_primary_key_query)
else:
try:
df = df.replace("NaT", None)
df = df.replace(pd.NaT, None)
df = df.replace({pd.NaT: None})
df_dict = df.to_dict('records')
except AttributeError as e:
logger.error('Empty Dataframe!')
raise e
with engine.connect() as connection:
logger.info('Table already exists!')
base = automap_base()
base.prepare(engine, reflect=True,)
target_table = Table(table_name, base.metadata,
autoload=True, autoload_with=engine,)
chunks = [df_dict[i:i + 1000] for i in range(0, len(df_dict), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
connection.execute(stmt.on_conflict_do_update(
constraint=f'{table_name}_pkey',
set_=update_dict)
)
logger.info('Saving data is successfully done.')
Table存在查询:
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{}'
);
添加主键查询:
ALTER TABLE {} add primary key (id);
在问这个问题之前,我已经阅读了很多关于在 Postgres 上进行 UPSERT
操作的链接:
- PostgreSQL Upsert Using INSERT ON CONFLICT statement
- Anyway to Upsert database using PostgreSQL in Python
但是问题和他们不一样,因为功能不一样。我想要的是实现类似 pandas to_sql
的功能,它具有以下功能:
- 自动创建table
- 保留每列的数据类型
to_sql
的唯一缺点是它不能 UPSERT
在 Postgres 上运行。是否有通过将数据帧传递给它来实现预期功能(自动创建 table 基于列,执行 UPSERT 操作并保留数据类型)?
之前使用 Pandas to_sql 函数实现的代码:
class PostgreSQL:
def __init__(self):
postgres_config = config_dict[Consts.POSTGRES.value]
self.host = postgres_config[Consts.HOST.value]
self.port = postgres_config[Consts.PORT.value]
self.db_name = postgres_config[Consts.DB_NAME.value]
self.username = postgres_config[Consts.USERNAME.value]
self.password = postgres_config[Consts.PASSWORD.value]
def get_connection(self) -> object:
url_schema = Consts.POSTGRES_URL_SCHEMA.value.format(
self.username, self.password, self.host, self.port, self.db_name
)
try:
engine = create_engine(url_schema)
return engine
except Exception as e:
logger.error('Make sure you have provided correct credentials for the DB connection.')
raise e
def save_df_to_db(self, df: object, table_name: str) -> None:
df.to_sql(table_name, con=self.get_connection(), if_exists='append')
我写了一个非常通用的代码来执行 UPSERT
,Postgres 官方不支持(直到 2021 年 12 月),使用 Pandas 数据帧并以高效的方式。
通过使用以下代码,它将更新现有的主键,否则它将创建一个新的 table(以防 table 名称不存在)并将新记录添加到 table.
代码:
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, Table
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.ext.automap import automap_base
class PostgreSQL:
def __init__(self):
postgres_config = config_dict[Consts.POSTGRES.value]
self.host = postgres_config[Consts.HOST.value]
self.port = postgres_config[Consts.PORT.value]
self.db_name = postgres_config[Consts.DB_NAME.value]
self.username = postgres_config[Consts.USERNAME.value]
self.password = postgres_config[Consts.PASSWORD.value]
def get_connection(self) -> object:
url_schema = 'postgresql://{}:{}@{}:{}/{}'.format(
self.username, self.password, self.host, self.port, self.db_name
)
try:
engine = create_engine(url_schema)
return engine
except Exception as e:
logger.error('Make sure you have provided correct credentials for the DB connection.')
raise e
def run_query(self, query: str) -> list:
engine = self.get_connection()
return engine.execute(query).fetchall()
def save_df_to_db(self, df: object, table_name: str) -> None:
root_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
engine = self.get_connection()
add_primary_key_query = get_query(root_dir, Directories.COMMON.value, 'add_primary_key.sql', table_name)
table_existence_query = get_query(root_dir, Directories.COMMON.value, 'table_existence.sql', table_name)
if not engine.execute(table_existence_query).first()[0]: # if table does not exist
logger.info('Create table automatically and from scratch!')
df.to_sql(table_name, con=self.get_connection(), if_exists='append')
engine.execute(add_primary_key_query)
else:
try:
df = df.replace("NaT", None)
df = df.replace(pd.NaT, None)
df = df.replace({pd.NaT: None})
df_dict = df.to_dict('records')
except AttributeError as e:
logger.error('Empty Dataframe!')
raise e
with engine.connect() as connection:
logger.info('Table already exists!')
base = automap_base()
base.prepare(engine, reflect=True,)
target_table = Table(table_name, base.metadata,
autoload=True, autoload_with=engine,)
chunks = [df_dict[i:i + 1000] for i in range(0, len(df_dict), 1000)]
for chunk in chunks:
stmt = insert(target_table).values(chunk)
update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}
connection.execute(stmt.on_conflict_do_update(
constraint=f'{table_name}_pkey',
set_=update_dict)
)
logger.info('Saving data is successfully done.')
Table存在查询:
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{}'
);
添加主键查询:
ALTER TABLE {} add primary key (id);