Python - 多线程帮助 - 读取多个文件 - ETL 到 SQL 服务器
Python - Mult-Threading Help - Reading Multiple Files - ETL Into SQL Server
我正在开发一个从本地驱动器读取 DBF 文件并将数据加载到 sql 服务器表中的程序。我对 Python 很陌生,我发现了一些关于多线程的细节,其中大部分令人困惑。读取和插入的性能很慢,看看我的 CPU 使用情况,我有足够的能力。我也是 运行 SSD 的。
此代码将扩展为从大约 400 个 zip 文件中读取大约 20 个 DBF 文件。所以我们总共讨论了 8000 个 DBF 文件。
我很难做到这一点。能否指点一下?
这是我的代码(有点乱,稍后我会整理),
import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile
# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()
dr = 'e:\Backups\dbf\'
work = 'e:\Backups\work\'
archive = 'e:\Backups\archive\'
for r in os.listdir(dr):
curdate = datetime.datetime.now()
filepath = dr + r
process = work + r
arc = archive + r
pth = r.replace(".sss","")
zipfolder = work + pth
filedateunix = os.path.getctime(filepath)
filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
).strftime('%Y-%m-%d %H:%M:%S')
shutil.move(filepath,process)
with ZipFile(process) as zf:
zf.extractall(zipfolder)
cursor.execute(
"insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
pth, filedateconverted, curdate)
cnxn.commit()
for dirpath, subdirs, files in os.walk (zipfolder):
for file in files:
dateadded = datetime.datetime.now()
if file.endswith(('.dbf','.DBF')):
dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()
if dbflocation.__contains__("\bk.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec1 = str(record['code'])
rec2 = str(record['name'])
rec3 = str(record['addr1'])
rec4 = str(record['addr2'])
rec5 = str(record['city'])
rec6 = str(record['state'])
rec7 = str(record['zip'])
rec8 = str(record['tel'])
rec9 = str(record['fax'])
cursor.execute(
"insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
cnxn.commit()
if dbflocation.__contains__("\cr.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec2 = str(record['cal_desc'])
rec3 = str(record['b_date'])
rec4 = str(record['b_time'])
rec5 = str(record['e_time'])
rec6 = str(record['with_desc'])
rec7 = str(record['recuruntil'])
rec8 = record['notes']
rec9 = dateadded
cursor.execute(
"insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
cnxn.commit()
shutil.move(process, archive)
shutil.rmtree(zipfolder)
tl;dr: 先测量,再修复!
请注意,在最常见的 Python 实现 (CPython) 中,一次只有一个线程可以执行 Python 字节码。
所以线程并不是提高 CPU-bound 性能的好方法。如果工作是 I/O-bound.
,他们可以很好地工作
但是你首先要做的是衡量。这怎么强调都不为过。如果您不知道导致性能下降的原因,则无法修复它!
编写 single-threaded 代码来完成这项工作,并 运行 在分析器下编写代码。首先尝试 built-in cProfile
。如果那不能给您足够的信息,请尝试例如line profiler.
分析应该会告诉您哪些步骤耗时最多。一旦你知道了这一点,你就可以开始改进了。
比如用multiprocessing
读取DBF文件是没有意义的,如果是把数据塞进SQL服务器这个动作耗时最多!这甚至可能会减慢速度,因为几个进程正在争夺 SQL 服务器的注意力。
如果 SQL 服务器不是瓶颈,和 它可以处理多个连接,我会使用 multiprocessing
,可能 Pool.map()
并行读取 DBF 并将数据填充到 SQL 服务器。在这种情况下,您应该 Pool.map
遍历 DBF 文件名列表,以便在工作进程中打开文件。
您可以尝试 executemany()
方法而不是在循环中单独插入。下面是一些 ETL 脚本中插入函数的示例:
def sql_insert(table_name, fields, rows, truncate_table = True):
if len(rows) == 0:
return
cursor = mdwh_connection.cursor()
cursor.fast_executemany = True
values_sql = ('?, ' * (fields.count(',') + 1))[:-2]
if truncate_table:
sql_truncate(table_name, cursor)
insert_sql = 'insert {0} ({1}) values ({2});'.format(table_name, fields, values_sql)
current_row = 0
batch_size = 50000
while current_row < len(rows):
cursor.executemany(insert_sql, rows[current_row:current_row + batch_size])
mdwh_connection.commit()
current_row += batch_size
logging.info(
'{} more records inserted. Total: {}'.format(
min(batch_size,len(rows)-current_row+batch_size),
min(current_row, len(rows))
)
)
我正在开发一个从本地驱动器读取 DBF 文件并将数据加载到 sql 服务器表中的程序。我对 Python 很陌生,我发现了一些关于多线程的细节,其中大部分令人困惑。读取和插入的性能很慢,看看我的 CPU 使用情况,我有足够的能力。我也是 运行 SSD 的。
此代码将扩展为从大约 400 个 zip 文件中读取大约 20 个 DBF 文件。所以我们总共讨论了 8000 个 DBF 文件。
我很难做到这一点。能否指点一下?
这是我的代码(有点乱,稍后我会整理),
import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile
# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()
dr = 'e:\Backups\dbf\'
work = 'e:\Backups\work\'
archive = 'e:\Backups\archive\'
for r in os.listdir(dr):
curdate = datetime.datetime.now()
filepath = dr + r
process = work + r
arc = archive + r
pth = r.replace(".sss","")
zipfolder = work + pth
filedateunix = os.path.getctime(filepath)
filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
).strftime('%Y-%m-%d %H:%M:%S')
shutil.move(filepath,process)
with ZipFile(process) as zf:
zf.extractall(zipfolder)
cursor.execute(
"insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
pth, filedateconverted, curdate)
cnxn.commit()
for dirpath, subdirs, files in os.walk (zipfolder):
for file in files:
dateadded = datetime.datetime.now()
if file.endswith(('.dbf','.DBF')):
dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()
if dbflocation.__contains__("\bk.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec1 = str(record['code'])
rec2 = str(record['name'])
rec3 = str(record['addr1'])
rec4 = str(record['addr2'])
rec5 = str(record['city'])
rec6 = str(record['state'])
rec7 = str(record['zip'])
rec8 = str(record['tel'])
rec9 = str(record['fax'])
cursor.execute(
"insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
cnxn.commit()
if dbflocation.__contains__("\cr.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec2 = str(record['cal_desc'])
rec3 = str(record['b_date'])
rec4 = str(record['b_time'])
rec5 = str(record['e_time'])
rec6 = str(record['with_desc'])
rec7 = str(record['recuruntil'])
rec8 = record['notes']
rec9 = dateadded
cursor.execute(
"insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
cnxn.commit()
shutil.move(process, archive)
shutil.rmtree(zipfolder)
tl;dr: 先测量,再修复!
请注意,在最常见的 Python 实现 (CPython) 中,一次只有一个线程可以执行 Python 字节码。 所以线程并不是提高 CPU-bound 性能的好方法。如果工作是 I/O-bound.
,他们可以很好地工作但是你首先要做的是衡量。这怎么强调都不为过。如果您不知道导致性能下降的原因,则无法修复它!
编写 single-threaded 代码来完成这项工作,并 运行 在分析器下编写代码。首先尝试 built-in cProfile
。如果那不能给您足够的信息,请尝试例如line profiler.
分析应该会告诉您哪些步骤耗时最多。一旦你知道了这一点,你就可以开始改进了。
比如用multiprocessing
读取DBF文件是没有意义的,如果是把数据塞进SQL服务器这个动作耗时最多!这甚至可能会减慢速度,因为几个进程正在争夺 SQL 服务器的注意力。
如果 SQL 服务器不是瓶颈,和 它可以处理多个连接,我会使用 multiprocessing
,可能 Pool.map()
并行读取 DBF 并将数据填充到 SQL 服务器。在这种情况下,您应该 Pool.map
遍历 DBF 文件名列表,以便在工作进程中打开文件。
您可以尝试 executemany()
方法而不是在循环中单独插入。下面是一些 ETL 脚本中插入函数的示例:
def sql_insert(table_name, fields, rows, truncate_table = True):
if len(rows) == 0:
return
cursor = mdwh_connection.cursor()
cursor.fast_executemany = True
values_sql = ('?, ' * (fields.count(',') + 1))[:-2]
if truncate_table:
sql_truncate(table_name, cursor)
insert_sql = 'insert {0} ({1}) values ({2});'.format(table_name, fields, values_sql)
current_row = 0
batch_size = 50000
while current_row < len(rows):
cursor.executemany(insert_sql, rows[current_row:current_row + batch_size])
mdwh_connection.commit()
current_row += batch_size
logging.info(
'{} more records inserted. Total: {}'.format(
min(batch_size,len(rows)-current_row+batch_size),
min(current_row, len(rows))
)
)