如何使用并行插入语句在 MySQL table 中插入巨大的 Pandas Dataframe?
How to Insert Huge Pandas Dataframe in MySQL table with Parallel Insert Statement?
我正在做一个项目,我必须编写一个包含数百万行和大约 25 列的数据框,其中大部分是数字类型。我正在使用 Pandas DataFrame to SQL Function 将数据帧转储到 Mysql table 中。我发现这个函数创建了一个可以一次插入多行的 Insert 语句。这是一个很好的方法,但 MySQL 对使用此方法可以构建的查询的长度有限制。
有没有办法在同一个 table 中并行插入,这样我就可以加快这个过程?
你可以做一些事情来实现这一点。
一种方法是在写入 sql 时使用附加参数。
df.to_sql(method = 'multi')
根据此 documentation,将 'multi' 传递给方法参数允许您批量插入。
另一种解决方案是使用 multiprocessing.dummy 构建自定义插入函数。
这是文档的 link :https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
import math
from multiprocessing.dummy import Pool as ThreadPool
...
def insert_df(df, *args, **kwargs):
nworkers = 4 # number of workers that executes insert in parallel fashion
chunk = math.floor(df.shape[0] / nworkers) # number of chunks
chunks = [(chunk * i, (chunk * i) + chunk) for i in range(nworkers)]
chunks.append((chunk * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
....
insert_df(df, "foo_bar", engine, if_exists='append')
在 上建议了第二种方法。
我正在做一个项目,我必须编写一个包含数百万行和大约 25 列的数据框,其中大部分是数字类型。我正在使用 Pandas DataFrame to SQL Function 将数据帧转储到 Mysql table 中。我发现这个函数创建了一个可以一次插入多行的 Insert 语句。这是一个很好的方法,但 MySQL 对使用此方法可以构建的查询的长度有限制。
有没有办法在同一个 table 中并行插入,这样我就可以加快这个过程?
你可以做一些事情来实现这一点。
一种方法是在写入 sql 时使用附加参数。
df.to_sql(method = 'multi')
根据此 documentation,将 'multi' 传递给方法参数允许您批量插入。
另一种解决方案是使用 multiprocessing.dummy 构建自定义插入函数。 这是文档的 link :https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
import math
from multiprocessing.dummy import Pool as ThreadPool
...
def insert_df(df, *args, **kwargs):
nworkers = 4 # number of workers that executes insert in parallel fashion
chunk = math.floor(df.shape[0] / nworkers) # number of chunks
chunks = [(chunk * i, (chunk * i) + chunk) for i in range(nworkers)]
chunks.append((chunk * nworkers, df.shape[0]))
pool = ThreadPool(nworkers)
def worker(chunk):
i, j = chunk
df.iloc[i:j, :].to_sql(*args, **kwargs)
pool.map(worker, chunks)
pool.close()
pool.join()
....
insert_df(df, "foo_bar", engine, if_exists='append')
在 上建议了第二种方法。