Python - 多线程帮助 - 读取多个文件 - ETL 到 SQL 服务器

Python - Mult-Threading Help - Reading Multiple Files - ETL Into SQL Server

我正在开发一个从本地驱动器读取 DBF 文件并将数据加载到 sql 服务器表中的程序。我对 Python 很陌生,我发现了一些关于多线程的细节,其中大部分令人困惑。读取和插入的性能很慢,看看我的 CPU 使用情况,我有足够的能力。我也是 运行 SSD 的。

此代码将扩展为从大约 400 个 zip 文件中读取大约 20 个 DBF 文件。所以我们总共讨论了 8000 个 DBF 文件。

我很难做到这一点。能否指点一下?

这是我的代码(有点乱,稍后我会整理),

import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile

# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost\test;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()

dr = 'e:\Backups\dbf\'
work = 'e:\Backups\work\'
archive = 'e:\Backups\archive\'


for r in os.listdir(dr):

    curdate = datetime.datetime.now()
    filepath = dr + r
    process = work + r
    arc = archive + r

    pth = r.replace(".sss","")
    zipfolder = work + pth
    filedateunix = os.path.getctime(filepath)
    filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
                                                  ).strftime('%Y-%m-%d %H:%M:%S')
    shutil.move(filepath,process)
    with ZipFile(process) as zf:
        zf.extractall(zipfolder)


    cursor.execute(
        "insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
    pth, filedateconverted, curdate)
    cnxn.commit()

    for dirpath, subdirs, files in os.walk (zipfolder):

        for file in files:
            dateadded = datetime.datetime.now()

            if file.endswith(('.dbf','.DBF')):
                dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()

                if dbflocation.__contains__("\bk.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec1 = str(record['code'])
                        rec2 = str(record['name'])
                        rec3 = str(record['addr1'])
                        rec4 = str(record['addr2'])
                        rec5 = str(record['city'])
                        rec6 = str(record['state'])
                        rec7 = str(record['zip'])
                        rec8 = str(record['tel'])
                        rec9 = str(record['fax'])
                        cursor.execute(
                       "insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
                        rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
                cnxn.commit()


                if dbflocation.__contains__("\cr.dbf"):
                    table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
                    for record in table.records:
                        rec2 = str(record['cal_desc'])
                        rec3 = str(record['b_date'])
                        rec4 = str(record['b_time'])
                        rec5 = str(record['e_time'])
                        rec6 = str(record['with_desc'])
                        rec7 = str(record['recuruntil'])
                        rec8 = record['notes']
                        rec9 = dateadded
                        cursor.execute(
                        "insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
                        rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
                cnxn.commit() 

    shutil.move(process, archive)
    shutil.rmtree(zipfolder)

tl;dr: 先测量,再修复!


请注意,在最常见的 Python 实现 (CPython) 中,一次只有一个线程可以执行 Python 字节码。 所以线程并不是提高 CPU-bound 性能的好方法。如果工作是 I/O-bound.

,他们可以很好地工作

但是你首先要做的是衡量。这怎么强调都不为过。如果您不知道导致性能下降的原因,则无法修复它!

编写 single-threaded 代码来完成这项工作,并 运行 在分析器下编写代码。首先尝试 built-in cProfile。如果那不能给您足够的信息,请尝试例如line profiler.

分析应该会告诉您哪些步骤耗时最多。一旦你知道了这一点,你就可以开始改进了。

比如用multiprocessing读取DBF文件是没有意义的,如果是把数据塞进SQL服务器这个动作耗时最多!这甚至可能会减慢速度,因为几个进程正在争夺 SQL 服务器的注意力。

如果 SQL 服务器不是瓶颈, 它可以处理多个连接,我会使用 multiprocessing,可能 Pool.map() 并行读取 DBF 并将数据填充到 SQL 服务器。在这种情况下,您应该 Pool.map 遍历 DBF 文件名列表,以便在工作进程中打开文件。

您可以尝试 executemany() 方法而不是在循环中单独插入。下面是一些 ETL 脚本中插入函数的示例:

def sql_insert(table_name, fields, rows, truncate_table = True):
    if len(rows) == 0:
        return

    cursor = mdwh_connection.cursor()
    cursor.fast_executemany = True
    values_sql = ('?, ' * (fields.count(',') + 1))[:-2]

    if truncate_table:
        sql_truncate(table_name, cursor)
    
    insert_sql = 'insert {0} ({1}) values ({2});'.format(table_name, fields, values_sql)
    current_row = 0
    batch_size = 50000

    while current_row < len(rows):
        cursor.executemany(insert_sql, rows[current_row:current_row + batch_size])
        mdwh_connection.commit()
        current_row += batch_size
        logging.info(
            '{} more records inserted. Total: {}'.format(
                min(batch_size,len(rows)-current_row+batch_size),
                min(current_row, len(rows))
            )
        )