无法在嵌套循环中使用 pandas 附加更大的数据帧。如何更改为numpy矢量化?
Failing to append bigger data frames with pandas in nested loops. How to change to numpy vectorization?
我需要从旧的 postgres 数据库加载一个巨大的 table (6 gb),其中包含一些我需要在加载时删除的错误值。所以我写了一个循环,出于性能原因尝试加载更大的块,但逐步减少以隔离和丢弃错误的值。通常这可行,但在大约 500 k 记录后,性能会迅速下降。
我已经发现不建议使用 pandas 处理更大的数据集。这就是我尝试使用 numpy 的原因。但这并没有改变任何事情。然后我尝试使用列表理解,但失败了,因为我必须使用异常来尝试以较小的块进行迭代。
在我看来,numpy 向量化看起来是个好主意,但我不知道如何让它发挥作用。
https://towardsdatascience.com/how-to-make-your-pandas-loop-71-803-times-faster-805030df4f06
总的来说这部分我想加快速度。
df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize])
appended_df.append(df)
products_df = pds.concat(appended_df, ignore_index=True)
如果上面的代码片段不够完整,您可以在下面找到更多内容。
# set autocommit = True
conn = pyodbc.connect(conn_str, autocommit=True)
cur = conn.cursor()
# count rows for chunking
sql_count = """\
select count("item_no") from "products"
"""
cur.execute(sql_count)
sql_row_counter = cur.fetchone()[0]
print("Total rows: " + str(sql_row_counter))
# define chunksize and calculate chunks
chunksize = 35000
chunk_divisor = 100
if chunksize / chunk_divisor < 1:
chunk_divisor = chunksize
print("Chunk devisor on error: " + str(chunk_divisor))
chksz_lvl2 = int(chunksize / chunk_divisor)
if chksz_lvl2 < 1:
chksz_lvl2 = 1
chksz_lvl3 = int(chksz_lvl2 / chunk_divisor)
if chksz_lvl3 < 1:
chksz_lvl3 = 1
# print settings for iteration
print("Chunksize: " + str(chunksize) + "\nChunksize Level 2: " +
str(chksz_lvl2) + "\nChunksize Level 3: " + str(chksz_lvl3))
chunks = int(sql_row_counter / chunksize)
# Uncomment next row for testpurposes
chunks = 25
print("Chunks: " + str(chunks) + "\n")
error_counter = 0
# iterate chunks
appended_df = []
print("Starting to iterate chunks.\nPlease wait...")
for i in range(0, chunks):
# try to iterate in full speed
print("\nNext chunk starts from " + str((i * chunksize)) +
" with an limit of " + str(chunksize) + ".")
try:
# start runtime measurment
i_start = time.time()
# sql statement
sql = """\
select "item_no", "description_1", "description_2", "description_3" FROM "products" order by "item_no" offset ? limit ?"""
# store into dataframe
df = pds.read_sql_query(sql,
conn,
params=[(i * chunksize), chunksize])
# get first and last value from dataframe
head = df["item_no"].iloc[0]
tail = df["item_no"].iloc[-1]
# store query
# Appending data frames via pandas.append() suddenly becomes slower by a factor of 10 from approx. 500,000 data records per 4 columns.
appended_df.append(df)
# stop runtime measurement
i_end = time.time()
# print result
print(
str(i + 1) + " out of " + str(chunks) + " chunks in " +
"{:5.3f}s".format(i_end - i_start) + " processed.")
except:
# collect error information
print(
"\nChunk " + str(i + 1) +
" cannot be selected due to an error. Reduce chunk size from "
+ str(chunksize) + " to " + str(chksz_lvl2) +
". Entering level 2.\nFirst working item_no of last working chunk "
+ str(head) +
"\nLast working item_no of last working chunk " +
str(tail))
### 2 ### Successively reduce the chunks to narrow down and isolate errors.
for j in range(0, chunk_divisor):
and so on...
...
...
...
# Merge chunks
print("\nNote: Chunkzize = from row_no to row_no. Could be 1,2,3,4 = range of 4 or compleley different. Ex. 2,45,99,1002 = range of 4.\n\nConcatinate chunks.")
products_df = pds.DataFrame()
products_df = pds.concat(appended_df, ignore_index=True)
print("Done. " + str(error_counter) +
" rows had to be skipped. Details can be found in the full error log.")
conn.close()
我刚刚注意到 python 脚本已经按预期 运行ning。像 Dask 这样的其他框架没有任何机会改进这一点。在我的例子中,我想获取一些数据的源 Postgres 数据库(在我的例子中是 9.x)有一个关于同时使用 limit
和 order by
的问题在查询巨大的 tables.
期间
我无法直接检测到这一点,因为我的 SQL 查询工具 (DBeaver) 即使您想查询完整的 table 也只加载一个子集来显示。因此,结果是一个虚假的朋友。如果您想通过订购正确地检查 运行 一个带有相当大 offset
和 limit
的短 select。
偏移量约为。 500 k 条记录,在我的例子中,只有一条记录到 select 的时间大约需要 10 秒。
解决方案是在我的嵌入式 SQL 脚本中的“尝试”部分删除 order by
。
我需要从旧的 postgres 数据库加载一个巨大的 table (6 gb),其中包含一些我需要在加载时删除的错误值。所以我写了一个循环,出于性能原因尝试加载更大的块,但逐步减少以隔离和丢弃错误的值。通常这可行,但在大约 500 k 记录后,性能会迅速下降。
我已经发现不建议使用 pandas 处理更大的数据集。这就是我尝试使用 numpy 的原因。但这并没有改变任何事情。然后我尝试使用列表理解,但失败了,因为我必须使用异常来尝试以较小的块进行迭代。
在我看来,numpy 向量化看起来是个好主意,但我不知道如何让它发挥作用。
https://towardsdatascience.com/how-to-make-your-pandas-loop-71-803-times-faster-805030df4f06
总的来说这部分我想加快速度。
df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize])
appended_df.append(df)
products_df = pds.concat(appended_df, ignore_index=True)
如果上面的代码片段不够完整,您可以在下面找到更多内容。
# set autocommit = True
conn = pyodbc.connect(conn_str, autocommit=True)
cur = conn.cursor()
# count rows for chunking
sql_count = """\
select count("item_no") from "products"
"""
cur.execute(sql_count)
sql_row_counter = cur.fetchone()[0]
print("Total rows: " + str(sql_row_counter))
# define chunksize and calculate chunks
chunksize = 35000
chunk_divisor = 100
if chunksize / chunk_divisor < 1:
chunk_divisor = chunksize
print("Chunk devisor on error: " + str(chunk_divisor))
chksz_lvl2 = int(chunksize / chunk_divisor)
if chksz_lvl2 < 1:
chksz_lvl2 = 1
chksz_lvl3 = int(chksz_lvl2 / chunk_divisor)
if chksz_lvl3 < 1:
chksz_lvl3 = 1
# print settings for iteration
print("Chunksize: " + str(chunksize) + "\nChunksize Level 2: " +
str(chksz_lvl2) + "\nChunksize Level 3: " + str(chksz_lvl3))
chunks = int(sql_row_counter / chunksize)
# Uncomment next row for testpurposes
chunks = 25
print("Chunks: " + str(chunks) + "\n")
error_counter = 0
# iterate chunks
appended_df = []
print("Starting to iterate chunks.\nPlease wait...")
for i in range(0, chunks):
# try to iterate in full speed
print("\nNext chunk starts from " + str((i * chunksize)) +
" with an limit of " + str(chunksize) + ".")
try:
# start runtime measurment
i_start = time.time()
# sql statement
sql = """\
select "item_no", "description_1", "description_2", "description_3" FROM "products" order by "item_no" offset ? limit ?"""
# store into dataframe
df = pds.read_sql_query(sql,
conn,
params=[(i * chunksize), chunksize])
# get first and last value from dataframe
head = df["item_no"].iloc[0]
tail = df["item_no"].iloc[-1]
# store query
# Appending data frames via pandas.append() suddenly becomes slower by a factor of 10 from approx. 500,000 data records per 4 columns.
appended_df.append(df)
# stop runtime measurement
i_end = time.time()
# print result
print(
str(i + 1) + " out of " + str(chunks) + " chunks in " +
"{:5.3f}s".format(i_end - i_start) + " processed.")
except:
# collect error information
print(
"\nChunk " + str(i + 1) +
" cannot be selected due to an error. Reduce chunk size from "
+ str(chunksize) + " to " + str(chksz_lvl2) +
". Entering level 2.\nFirst working item_no of last working chunk "
+ str(head) +
"\nLast working item_no of last working chunk " +
str(tail))
### 2 ### Successively reduce the chunks to narrow down and isolate errors.
for j in range(0, chunk_divisor):
and so on...
...
...
...
# Merge chunks
print("\nNote: Chunkzize = from row_no to row_no. Could be 1,2,3,4 = range of 4 or compleley different. Ex. 2,45,99,1002 = range of 4.\n\nConcatinate chunks.")
products_df = pds.DataFrame()
products_df = pds.concat(appended_df, ignore_index=True)
print("Done. " + str(error_counter) +
" rows had to be skipped. Details can be found in the full error log.")
conn.close()
我刚刚注意到 python 脚本已经按预期 运行ning。像 Dask 这样的其他框架没有任何机会改进这一点。在我的例子中,我想获取一些数据的源 Postgres 数据库(在我的例子中是 9.x)有一个关于同时使用 limit
和 order by
的问题在查询巨大的 tables.
我无法直接检测到这一点,因为我的 SQL 查询工具 (DBeaver) 即使您想查询完整的 table 也只加载一个子集来显示。因此,结果是一个虚假的朋友。如果您想通过订购正确地检查 运行 一个带有相当大 offset
和 limit
的短 select。
偏移量约为。 500 k 条记录,在我的例子中,只有一条记录到 select 的时间大约需要 10 秒。
解决方案是在我的嵌入式 SQL 脚本中的“尝试”部分删除 order by
。