Pandas 使用 df.to_sql() 将块写入数据库时​​出错

Pandas errors in writing chunks to database with df.to_sql()

现有数据库和预期结果:

我有一个更大的 SQLite 数据库(12gb,table 超过 4400 万行),我想使用 Python3 中的 Pandas 进行修改。

Example Objective: I hope to read one of these large tables (44 million rows) into a DF in chunks, manipulate the DF chunk, and write the result to a new table. If possible, I would like to replace the new table if it exists, and append each chunk to it.

Because my manipulations only add or modify columns, the new table should have the same number of rows as the original table.

问题:

主要问题似乎源于以下代码中的以下行:

df.to_sql(new_table, con=db, if_exists = "append", index=False)

  1. 当下面代码中的这一行是 运行 时,我似乎总是得到一个额外的 size=N 块,加上一个超出我预期的观察值。
  2. 此代码 运行 第一次使用新的 table 名称时,出现错误:
 Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database
  1. 如果我重新运行 脚本,使用相同的新 table 名称,每个块 运行s,以及一个额外的块,+ 1 行。

  2. 当注释掉 df.to_sql() 行时,循环 运行s 以获得预期的块数。

完整代码问题测试示例:

完整代码:example.py

import pandas as pd
import sqlite3

#Helper Functions Used in Example
def ren(invar, outvar, df):
    df.rename(columns={invar:outvar}, inplace=True)
    return(df)

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()
new_table = "new_table"

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from test_table limit 10000;", con=db, chunksize = 5000)

for df in df_generator:
    #Functions to modify data, example
    df = ren("name", "renamed_name", df)
    print(df.shape)
    df.to_sql(new_table, con=db, if_exists = "append", index=False)


#Count if new table is created
try:
    count_result(c, new_table)
except:
    pass

1. Result when #df.to_sql(new_table, con=db, if_exists = "append", index=False)

(the problem line is commented out):

$ python3 example.py 
(5000, 22)
(5000, 22)

这是我所期望的,因为示例代码将我的大 table 限制为 10k 行。

2. Result when df.to_sql(new_table, con=db, if_exists = "append", index=False)

a. the problem line is not commented out

b. this is the first time the code is run with a new_table:

$ python3 example.py 
(5000, 22)
Traceback (most recent call last):
  File "example.py", line 23, in <module>
    for df in df_generator:
  File "/usr/local/lib/python3.5/site-packages/pandas/io/sql.py", line 1420, in _query_iterator
    data = cursor.fetchmany(chunksize)
sqlite3.OperationalError: SQL logic error or missing database

3. Result when df.to_sql(new_table, con=db, if_exists = "append", index=False)

a. the problem line is not commented out

b. the above code is run a second time with the new_table:

$ python3 example.py 
(5000, 22)
(5000, 22)
(5000, 22)
(1, 22)
[*] total: 20,001 rows in new_table table

因此,我遇到的问题是第一次 运行 时的代码中断(结果 2),其次是第二次 运行 时的总行数(结果 3)是我预期的两倍多。

如有任何关于如何解决此问题的建议,我们将不胜感激。

您可以尝试指定:

db = sqlite3.connect("test.db", isolation_level=None)
#  ---->                        ^^^^^^^^^^^^^^^^^^^^

除此之外,您可能会尝试增加块大小,否则提交之间的时间是 SQLite DB 的缩写方式 - 这导致了此错误,我猜...我还建议使用 PostgreSQL,MySQL/MariaDB 或类似的东西——它们更可靠,更适合这样的数据库大小……

上述解决方案中的时间延迟

@MaxU 将 isolation_level=None 添加到数据库连接的解决方案简短而有趣。但是,无论出于何种原因,它都会显着减慢 writing/committing 每个块到数据库的速度。例如,当我在 1200 万行的 table 上测试解决方案时,代码需要 6 个多小时才能完成。相反,从几个文本文件构建原始 table 需要几分钟时间。

This insight led to a faster but less elegant solution, which took less than 7 minutes to complete on a table of 12 million rows versus over 6 hours. The output rows matched the input rows, solving the problem in my original question.

更快但不够优雅的解决方案

自从从文本 files/csv 文件构建原始 table 并使用 SQL 脚本加载数据后,我将这种方法与 Panda 的块功能相结合。必要的基本步骤如下:

  1. 连接到数据库
  2. 使用 SQL 脚本创建一个新的 table(列和顺序应与您对 pandas df 所做的任何操作相匹配)
  3. 分块阅读大量table
  4. 对于每个块,根据需要修改 df,写入 csv,使用 sql 加载 csv,然后提交更改。

解法主要代码:

import pandas as pd
import sqlite3

#Note I Used Functions I Wrote in build_db.py
#(shown below after example solution)
from build_db import *


#Helper Functions Used in Example
def lower_var(var, df):
    s = df[var].str.lower()
    df = df.drop(var, axis=1)
    df = pd.concat([df, s], axis=1)
    return(df)


#Connect to Data
db = sqlite3.connect("test.db")
c = db.cursor()

#create statement
create_table(c, "create_test.sql", path='sql_clean/')

#Load Data in Chunks
df_generator = pd.read_sql_query("select * from example_table;", con=db, chunksize = 100000)

for df in df_generator:
    #functions to modify data, example
    df = lower_var("name", df) #changes column order

    #restore df to column order in sql table
    db_order = ["cmte_id", "amndt_ind", "rpt_tp", "transaction_pgi", "image_num", "transaction_tp", \
        "entity_tp", "name", "city", "state", "zip_code", "employer", "occupation", "transaction_dt", \
        "transaction_amt", "other_id", "tran_id", "file_num", "memo_cd", "memo_text", "sub_id"]
    df = df[db_order]

    #write chunk to csv
    file = "df_chunk.csv"
    df.to_csv(file, sep='|', header=None, index=False)

    #insert chunk csv to db
    insert_file_into_table(c, "insert_test.sql", file, '|', path='sql_clean/')
    db.commit()


#Count results
count_result(c, "test_indiv")

以上代码的导入用户函数

#Relavant Functions in build_db.py

def count_result(c, table):
    ([print("[*] total: {:,} rows in {} table"
        .format(r[0], table)) 
        for r in c.execute("SELECT COUNT(*) FROM {};".format(table))])

def create_table(cursor, sql_script, path='sql/'):
    print("[*] create table with {}{}".format(path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    cursor.executescript(qry)


def insert_file_into_table(cursor, sql_script, file, sep=',', path='sql/'):
    print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
    qry = open("{}{}".format(path, sql_script), 'rU').read()
    fileObj = open(file, 'rU', encoding='latin-1')
    csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')

    try:
        for row in csvReader:
            try:
                cursor.execute(qry, row)
            except sqlite3.IntegrityError as e:
                pass

    except Exception as e:
        print("[*] error while processing file: {}, error code: {}".format(file, e))
        print("[*] sed replacing null bytes in file: {}".format(file))
        sed_replace_null(file, "clean_null.sh")
        subprocess.call("bash clean_null.sh", shell=True)

        try:
            print("[*] inserting {} into table with {}{}".format(file, path, sql_script))
            fileObj = open(file, 'rU', encoding='latin-1')
            csvReader = csv.reader(fileObj, delimiter=sep, quotechar='"')
            for row in csvReader:
                try:
                    cursor.execute(qry, row)
                except sqlite3.IntegrityError as e:
                    pass
                    print(e)    

        except Exception as e:
            print("[*] error while processing file: {}, error code: {}".format(file, e))

SQL 用户脚本

--create_test.sql

DROP TABLE if exists test_indiv;

CREATE TABLE test_indiv (
    cmte_id TEXT NOT NULL,
    amndt_ind TEXT,
    rpt_tp TEXT,
    transaction_pgi TEXT,
    image_num TEXT,
    transaction_tp TEXT,
    entity_tp TEXT,
    name TEXT,
    city TEXT,
    state TEXT,
    zip_code TEXT,
    employer TEXT,
    occupation TEXT,
    transaction_dt TEXT,
    transaction_amt TEXT,
    other_id TEXT,
    tran_id TEXT,
    file_num NUMERIC,
    memo_cd TEXT,
    memo_text TEXT,
    sub_id NUMERIC NOT NULL
);

CREATE UNIQUE INDEX idx_test_indiv ON test_indiv (sub_id);
--insert_test.sql

INSERT INTO test_indiv (
    cmte_id,
    amndt_ind,
    rpt_tp,
    transaction_pgi,
    image_num,
    transaction_tp,
    entity_tp,
    name,
    city,
    state,
    zip_code,
    employer,
    occupation,
    transaction_dt,
    transaction_amt,
    other_id,
    tran_id,
    file_num,
    memo_cd,
    memo_text,
    sub_id
    ) 
VALUES (
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?,
    ?
);

遇到了完全相同的问题(处理超过 30 GB 的数据)。这是我解决问题的方法: 而不是使用 read_sql 的 Chunk 特性。我决定像这样创建一个手动块循环器:

chunksize=chunk_size
offset=0
for _ in range(0, a_big_number):
    query = "SELECT * FROM the_table %s offset %s" %(chunksize, offset)
    df = pd.read_sql(query, conn)
    if len(df)!=0:
        ....
    else:
        break