如何在 Python 中使用 SQLAlchemy 根据列删除和插入行?

How to delete and insert rows based on a column with SQLAlchemy in Python?

我的代码读取一些 excel 文件并将它们附加到列表中,同时添加一列以提取文件名。然后它连接所有内容并将其发送到 table.


all_df_list = []

for file in files_list:  
     #reads and appends excel files
     frame = pd.read_excel(pd.read_excel(file, header=0, engine='openpyxl'))
     frame['filename'] = os.path.basename(file)
     all_df_list.append(frame)

xls=pd.concat(all_df_list)
xls.to_sql(table, con=engine, if_exists='append', index=False, chunksize=10000)

这段代码工作得很好,输出是这样的:

Column A Column B filname
First row file 01.xlsx
Second row file 02.xlsx

我现在需要的是更改我的代码以检查列中的文件名 ( (os.path.basename(file) ),然后只覆盖那些行或删除 filename = (os.path.basename(file) 所在的行,然后执行代码多于。就像,如果该文件名存在,删除相应的行并执行上面的代码,类似的东西。

有什么办法可以做到吗?我正在使用 sqlalchemy 中的 create_engine 方法来访问数据库

更新的答案:

假设您想尽量减少在数据库端完成的工作并尽可能多地使用数据帧,这将获得与我的原始答案相同的结果(进一步向下)。

初始状态: 出于测试目的,初始化主数据库table、SomeTable,以模拟之前更新过的情况:

xls=pd.concat([
    pd.DataFrame({'Column A':['First'], 'Column B':['row'], 'filename':['file 01.xlsx']}),
    pd.DataFrame({'Column A':['Second'], 'Column B':['row'], 'filename':['file 02.xlsx']}),
    pd.DataFrame({'Column A':['Third'], 'Column B':['row'], 'filename':['file 03.xlsx']})
])
xls.to_sql("SomeTable", con=engine, if_exists='replace', index=False, chunksize=10000)

SomeTable 的内容:

  Column A Column B      filename
0    First      row  file 01.xlsx
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx

更新来源: 指定用于更新数据库中的 SomeTable 的文件并将这些文件加载​​到 dfBasenames:

files_list = ['file 01.xlsx', 'file 04.xlsx']

import os

# accumulate new file names in files_list into "new_file"
basenames = set()
for file in files_list:
    basenames.add(os.path.basename(file))
dfBasenames = pd.DataFrame({'filename': list(basenames)})

dfBasenames 的内容:

       filename
0  file 04.xlsx
1  file 01.xlsx

删除陈旧的行: 删除 df 中文件名列值在 dfBasenames:

中的行
df = df.join(dfBasenames.assign(is_stale=True).set_index('filename'), on='filename')
df = df[df['is_stale'].isna()].drop(columns=['is_stale'])

df 的内容:

  Column A Column B      filename
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx

请注意,带有 filename == "file 01.xlsx" 的原始行已被删除。

从最新文件更新 df 将在 files_listconcat 中的 Excel 文件中找到的行与 df 中的 non-stale 行聚合在一起:

all_df_list = []
for file in files_list:  
    #reads and appends excel files
    frame = pd.read_excel(file)
    frame['filename'] = os.path.basename(file)
    print(f'\nfile {file}:\n{frame}')
    all_df_list.append(frame)
df = pd.concat([df] + all_df_list)

df 的内容:

  Column A Column B      filename
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx
0   1.a.01   1.b.01  file 01.xlsx
1   1.a.02   1.b.02  file 01.xlsx
0   4.a.01   4.b.01  file 04.xlsx
1   4.a.02   4.b.02  file 04.xlsx

提交对数据库的更改: 使用 df:

在数据库中覆盖 SomeTable
df.to_sql("SomeTable", con=engine, if_exists='replace', index=False, chunksize=10000)

SomeTable 的内容:

  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx
2   1.a.01   1.b.01  file 01.xlsx
3   1.a.02   1.b.02  file 01.xlsx
4   4.a.01   4.b.01  file 04.xlsx
5   4.a.02   4.b.02  file 04.xlsx

完整测试代码:

from sqlalchemy import create_engine
import pandas as pd
connectable = 'sqlite:///foo.db'
engine = create_engine(connectable)

# For testing purposes only, initialize SomeTable so it's not empty
xls=pd.concat([
    pd.DataFrame({'Column A':['First'], 'Column B':['row'], 'filename':['file 01.xlsx']}),
    pd.DataFrame({'Column A':['Second'], 'Column B':['row'], 'filename':['file 02.xlsx']}),
    pd.DataFrame({'Column A':['Third'], 'Column B':['row'], 'filename':['file 03.xlsx']})
])
xls.to_sql("SomeTable", con=engine, if_exists='replace', index=False, chunksize=10000)

# read and print SomeTable for verification of initial state
df = pd.read_sql_table("SomeTable", connectable) 
print('======== VERIFY INITIAL STATE: read_sql_table("SomeTable"):')
print(df)

# specify files to use to update SomeTable in the database
files_list = ['file 01.xlsx', 'file 04.xlsx']

import os

# accumulate new file names in files_list into "new_file"
basenames = set()
for file in files_list:
    basenames.add(os.path.basename(file))
dfBasenames = pd.DataFrame({'filename': list(basenames)})
print('======== VERIFY FILENAMES TO UPDATE FROM:')
print(dfBasenames)

# delete rows in SomeTable copy with filename column found in dfBasenames
df = df.join(dfBasenames.assign(is_stale=True).set_index('filename'), on='filename')
df = df[df['is_stale'].isna()].drop(columns=['is_stale'])
print('======== VERIFY DELETION OF ROWS MATCHING NEW FILENAMES:')
print(df)

# aggregate rows found in Excel files in dfBasenames into all_df_list and concat to remaining non-stale rows of SomeTable copy
all_df_list = []
for file in files_list:  
    #reads and appends excel files
    frame = pd.read_excel(file)
    frame['filename'] = os.path.basename(file)
    print(f'\nfile {file}:\n{frame}')
    all_df_list.append(frame)
df = pd.concat([df] + all_df_list)
print('======== VERIFY UPDATED DF READY TO COMMIT TO DB:')
print(df)

# overwrite SomeTable in database
df.to_sql("SomeTable", con=engine, if_exists='replace', index=False, chunksize=10000)

# read and print table for verification of correct result
df = pd.read_sql_table("SomeTable", connectable) 
print('======== VERIFY UPDATED TABLE: read_sql_table("SomeTable"):')
print(df)

测试输出:

======== VERIFY INITIAL STATE: read_sql_table("SomeTable"):
  Column A Column B      filename
0    First      row  file 01.xlsx
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx
======== VERIFY FILENAMES TO UPDATE FROM:
       filename
0  file 04.xlsx
1  file 01.xlsx
======== VERIFY DELETION OF ROWS MATCHING NEW FILENAMES:
  Column A Column B      filename
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx

file file 01.xlsx:
  Column A Column B      filename
0   1.a.01   1.b.01  file 01.xlsx
1   1.a.02   1.b.02  file 01.xlsx

file file 04.xlsx:
  Column A Column B      filename
0   4.a.01   4.b.01  file 04.xlsx
1   4.a.02   4.b.02  file 04.xlsx
======== VERIFY UPDATED DF READY TO COMMIT TO DB:
  Column A Column B      filename
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx
0   1.a.01   1.b.01  file 01.xlsx
1   1.a.02   1.b.02  file 01.xlsx
0   4.a.01   4.b.01  file 04.xlsx
1   4.a.02   4.b.02  file 04.xlsx
======== VERIFY UPDATED TABLE: read_sql_table("SomeTable"):
  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx
2   1.a.01   1.b.01  file 01.xlsx
3   1.a.02   1.b.02  file 01.xlsx
4   4.a.01   4.b.01  file 04.xlsx
5   4.a.02   4.b.02  file 04.xlsx


原始答案:

这是一种完成您所要求的方法。

初始状态: (与上面更新的答案相同。)

更新来源: 指定用于在数据库中更新 SomeTable 的文件并将这些文件加载​​到临时数据库 table new_file:

files_list = ['file 01.xlsx', 'file 04.xlsx']
basenames = set()
for file in files_list:
    basenames.add(os.path.basename(file))
dfBasenames = pd.DataFrame({'filename': list(basenames)})
dfBasenames.to_sql("new_file", con=engine, if_exists='replace', index=False, chunksize=10000)

new_file 的内容:

       filename
0  file 01.xlsx
1  file 04.xlsx

删除陈旧的行: 删除 SomeTable 中文件名列值在 new_file table:

中的行
with engine.connect() as connection:
    result = connection.execute('delete from SomeTable where exists (select 1 from new_file where new_file.filename = SomeTable.filename)')

SomeTable 的内容:

  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx

请注意,带有 filename == "file 01.xlsx" 的原始行已被删除。

从最新文件更新数据库: 汇总在 files_list 的 Excel 个文件中找到的行,并将它们附加到 SomeTable:

all_df_list = []
for file in files_list:  
    #reads and appends excel files
    frame = pd.read_excel(file)
    frame['filename'] = os.path.basename(file)
    all_df_list.append(frame)
xls = pd.concat(all_df_list)
xls.to_sql("SomeTable", con=engine, if_exists='append', index=False, chunksize=10000)

SomeTable 的内容:

  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx
2   1.a.01   1.b.01  file 01.xlsx
3   1.a.02   1.b.02  file 01.xlsx
4   4.a.01   4.b.01  file 04.xlsx
5   4.a.02   4.b.02  file 04.xlsx

完整测试代码: 测试代码(带打印语句)为:

from sqlalchemy import create_engine
import pandas as pd
connectable = 'sqlite:///foo.db'
engine = create_engine(connectable)

# For testing purposes only, initialize SomeTable so it's not empty
xls=pd.concat([
    pd.DataFrame({'Column A':['First'], 'Column B':['row'], 'filename':['file 01.xlsx']}),
    pd.DataFrame({'Column A':['Second'], 'Column B':['row'], 'filename':['file 02.xlsx']}),
    pd.DataFrame({'Column A':['Third'], 'Column B':['row'], 'filename':['file 03.xlsx']})
])
xls.to_sql("SomeTable", con=engine, if_exists='replace', index=False, chunksize=10000)

# read and print SomeTable for verification of initial state
df = pd.read_sql_table("SomeTable", connectable) 
print('======== VERIFY INITIAL STATE: read_sql_table("SomeTable"):')
print(df)

# specify files to use to update SomeTable in the database
files_list = ['file 01.xlsx', 'file 04.xlsx']

import os

# accumulate new file names in files_list into "new_file"
basenames = set()
for file in files_list:
    basenames.add(os.path.basename(file))
dfBasenames = pd.DataFrame({'filename': list(basenames)})
dfBasenames.to_sql("new_file", con=engine, if_exists='replace', index=False, chunksize=10000)

# read and print table for verification of correct result
df = pd.read_sql_table("new_file", connectable) 
print('======== VERIFY FILENAMES TO UPDATE FROM: read_sql_table("new_file"):')
print(df)

# delete rows in SomeTable with filename column found in new_file table
with engine.connect() as connection:
    result = connection.execute('delete from SomeTable where exists (select 1 from new_file where new_file.filename = SomeTable.filename)')

# read and print table for verification of correct result
df = pd.read_sql_table("SomeTable", connectable) 
print('======== VERIFY DELETION OF ROWS MATCHING NEW FILENAMES: read_sql_table("SomeTable"):')
print(df)

# aggregate rows found in Excel files in files_list into all_df_list
all_df_list = []
for file in files_list:  
    #reads and appends excel files
    frame = pd.read_excel(file)
    frame['filename'] = os.path.basename(file)
    print(f'\nfile {file}:\n{frame}')
    all_df_list.append(frame)

# append rows in all_df_list to SomeTable
xls = pd.concat(all_df_list)
xls.to_sql("SomeTable", con=engine, if_exists='append', index=False, chunksize=10000)

# read and print table for verification of correct result
df = pd.read_sql_table("SomeTable", connectable) 
print('======== VERIFY UPDATED TABLE: read_sql_table("SomeTable"):')
print(df)

测试输出:

======== VERIFY INITIAL STATE: read_sql_table("SomeTable"):
  Column A Column B      filename
0    First      row  file 01.xlsx
1   Second      row  file 02.xlsx
2    Third      row  file 03.xlsx
======== VERIFY FILENAMES TO UPDATE FROM: read_sql_table("new_file"):
       filename
0  file 01.xlsx
1  file 04.xlsx
======== VERIFY DELETION OF ROWS MATCHING NEW FILENAMES: read_sql_table("SomeTable"):
  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx

file file 01.xlsx:
  Column A Column B      filename
0   1.a.01   1.b.01  file 01.xlsx
1   1.a.02   1.b.02  file 01.xlsx

file file 04.xlsx:
  Column A Column B      filename
0   4.a.01   4.b.01  file 04.xlsx
1   4.a.02   4.b.02  file 04.xlsx
======== VERIFY UPDATED TABLE: read_sql_table("SomeTable"):
  Column A Column B      filename
0   Second      row  file 02.xlsx
1    Third      row  file 03.xlsx
2   1.a.01   1.b.01  file 01.xlsx
3   1.a.02   1.b.02  file 01.xlsx
4   4.a.01   4.b.01  file 04.xlsx
5   4.a.02   4.b.02  file 04.xlsx

据我了解,您想在再次插入行之前根据文件名删除行。 也许这会对你有所帮助

engine.execute("DELETE FROM %s WHERE filename = %s", (table, file))

如果文件是完整路径,您应该使用os.path.basename(file)代替