与 pandas 和 GNU Parallel 并行地将数据导入 MySQL
Importing data to MySQL in parallel with pandas and GNU Parallel
我有几千个目录,我想从中将数据导入到 MySQL。我创建了一个 python 脚本,它从单个目录读取数据并将其放入数据库。这是将数据发送到数据库的部分:
host = 'localhost'
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('delete from tests where ml="%s"' % ml)
tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
data.to_sql(con=conn, name='data', if_exists='append', index=False)
trans.commit()
print(CGRE + ml + ': OK' + CEND)
except:
trans.rollback()
print(CRED + ml + ': database error!' + CEND)
raise
conn.close()
单线程执行效果很好但太慢:
parallel -j 1 "[[ -d {} ]] && (cd {} && data_to_db.py) || echo {} >> ~/Data/failed_db" ::: *
现在我想启动几个进程:
parallel -j 8 .........
有时在执行过程中我会得到这个错误:
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
有没有办法增加交易的等待时间或其他方式解决,因为没有并行执行,导入所有数据的时间太长?
非常感谢@RomanPerekhrest,这是 MySQL 手册中使用 LOCK/UNLOCK TABLES
.
的工作 solution
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('set autocommit=0')
conn.execute('lock tables tests write, data write')
conn.execute('delete from tests where ml="%s"' % ml)
tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
data.to_sql(con=conn, name='data', if_exists='append', index=False)
trans.commit()
conn.execute('unlock tables')
print(CGRE + ml + ': OK' + CEND)
except:
trans.rollback()
conn.execute('unlock tables')
conn.close()
print(CRED + ml + ': database error!' + CEND)
raise
conn.close()
我有几千个目录,我想从中将数据导入到 MySQL。我创建了一个 python 脚本,它从单个目录读取数据并将其放入数据库。这是将数据发送到数据库的部分:
host = 'localhost'
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('delete from tests where ml="%s"' % ml)
tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
data.to_sql(con=conn, name='data', if_exists='append', index=False)
trans.commit()
print(CGRE + ml + ': OK' + CEND)
except:
trans.rollback()
print(CRED + ml + ': database error!' + CEND)
raise
conn.close()
单线程执行效果很好但太慢:
parallel -j 1 "[[ -d {} ]] && (cd {} && data_to_db.py) || echo {} >> ~/Data/failed_db" ::: *
现在我想启动几个进程:
parallel -j 8 .........
有时在执行过程中我会得到这个错误:
sqlalchemy.exc.InternalError: (pymysql.err.InternalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
有没有办法增加交易的等待时间或其他方式解决,因为没有并行执行,导入所有数据的时间太长?
非常感谢@RomanPerekhrest,这是 MySQL 手册中使用 LOCK/UNLOCK TABLES
.
engine = create_engine('mysql://user:pass@%s/db?charset=utf8' % host)
conn = engine.connect()
trans = conn.begin()
try:
conn.execute('set autocommit=0')
conn.execute('lock tables tests write, data write')
conn.execute('delete from tests where ml="%s"' % ml)
tests.to_sql(con=conn, name='tests', if_exists='append', index=False)
data.to_sql(con=conn, name='data', if_exists='append', index=False)
trans.commit()
conn.execute('unlock tables')
print(CGRE + ml + ': OK' + CEND)
except:
trans.rollback()
conn.execute('unlock tables')
conn.close()
print(CRED + ml + ': database error!' + CEND)
raise
conn.close()