使用 Python 逐行处理非常大的 900M 行 MySQL table

Process very large 900M row MySQL table line by line with Python

我经常需要使用 Python 逐行处理 MySQL table 的几亿行。我想要一个健壮且不需要监视的脚本。

下面我粘贴了一个脚本,用于对我的行中消息字段的语言进行分类。它利用了 sqlalchemy 和 MySQLdb.cursors.SSCursor 模块。不幸的是,当我远程 运行 时,此脚本在 4840 行之后始终抛出 'Lost connection to MySQL server during query' 错误,当我在本地 运行 时,该脚本始终抛出 42000 行。

此外,我已经检查 max_allowed_packet = 32M 我的 MySQL 服务器的 /etc/mysql/my.cnf 文件根据这个 Whosebug 问题的答案 Lost connection to MySQL server during query

任何有关修复此错误或使用其他方法使用 Python 以稳健的方式处理非常大的 MySQL 文件的任何建议,我们将不胜感激!

import sqlalchemy
import MySQLdb.cursors
import langid

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = "mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema)
db_eng = sqlalchemy.create_engine(engine_url, connect_args={'cursorclass': MySQLdb.cursors.SSCursor} )
langid.set_languages(['fr', 'de'])

print "Executing input query..."
data_iter = db_eng.execute("SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT 10000".format(table))

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter):
    count = 0;
    for item in update_iter:
        count += 1;
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        db_eng.execute("UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}".format(table, lang, conf, message_id))

data_iter_upd = process(data_iter)

print "Begin processing..."
update_table(data_iter_upd)

According to MySQLdb developer Andy Dustman

[When using SSCursor,] no new queries can be issued on the connection until the entire result set has been fetched.

post 表示如果您发出另一个查询,您将收到 "commands out of sequence" 错误,这不是您看到的错误。所以我不确定以下内容是否一定能解决您的问题。尽管如此,尝试从您的代码中删除 SSCursor 并使用更简单的默认 Cursor 只是为了测试这是否是问题的根源可能是值得的。

例如,您可以在 SELECT 语句中使用 LIMIT chunksize OFFSET n 以块的形式循环遍历数据集:

import sqlalchemy
import MySQLdb.cursors
import langid
import itertools as IT
chunksize = 1000

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter, engine):
    for count, item in enumerate(update_iter):
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        engine.execute(
            "UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}"
            .format(table, lang, conf, message_id))

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = ("mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf"
              .format(schema))

db_eng = sqlalchemy.create_engine(engine_url)
langid.set_languages(['fr', 'de'])

for offset in IT.count(start=0, step=chunksize):
    print "Executing input query..."
    result = db_eng.execute(
        "SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT {} OFFSET {}"
        .format(table, chunksize, offset))
    result = list(result)
    if not result: break
    data_iter_upd = process(result)

    print "Begin processing..."
    update_table(data_iter_upd, db_eng)