无法在嵌套循环中使用 pandas 附加更大的数据帧。如何更改为numpy矢量化?

Failing to append bigger data frames with pandas in nested loops. How to change to numpy vectorization?

我需要从旧的 postgres 数据库加载一个巨大的 table (6 gb),其中包含一些我需要在加载时删除的错误值。所以我写了一个循环,出于性能原因尝试加载更大的块,但逐步减少以隔离和丢弃错误的值。通常这可行,但在大约 500 k 记录后,性能会迅速下降。

我已经发现不建议使用 pandas 处理更大的数据集。这就是我尝试使用 numpy 的原因。但这并没有改变任何事情。然后我尝试使用列表理解,但失败了,因为我必须使用异常来尝试以较小的块进行迭代。

在我看来,numpy 向量化看起来是个好主意,但我不知道如何让它发挥作用。

https://towardsdatascience.com/how-to-make-your-pandas-loop-71-803-times-faster-805030df4f06

总的来说这部分我想加快速度。

df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize])
appended_df.append(df)
products_df = pds.concat(appended_df, ignore_index=True)

如果上面的代码片段不够完整,您可以在下面找到更多内容。

# set autocommit = True
conn = pyodbc.connect(conn_str, autocommit=True)

cur = conn.cursor()

# count rows for chunking
sql_count = """\
select count("item_no") from "products" 
"""
cur.execute(sql_count)
sql_row_counter = cur.fetchone()[0]
print("Total rows: " + str(sql_row_counter))

# define chunksize and calculate chunks
chunksize = 35000
chunk_divisor = 100
if chunksize / chunk_divisor < 1:
    chunk_divisor = chunksize
print("Chunk devisor on error: " + str(chunk_divisor))
chksz_lvl2 = int(chunksize / chunk_divisor)
if chksz_lvl2 < 1:
    chksz_lvl2 = 1
chksz_lvl3 = int(chksz_lvl2 / chunk_divisor)
if chksz_lvl3 < 1:
    chksz_lvl3 = 1
# print settings for iteration
print("Chunksize: " + str(chunksize) + "\nChunksize Level 2: " +
       str(chksz_lvl2) + "\nChunksize Level 3: " + str(chksz_lvl3))
chunks = int(sql_row_counter / chunksize)
# Uncomment next row for testpurposes
chunks = 25
print("Chunks: " + str(chunks) + "\n")
error_counter = 0
# iterate chunks
appended_df = []
print("Starting to iterate chunks.\nPlease wait...")

for i in range(0, chunks):
            # try to iterate in full speed
            print("\nNext chunk starts from " + str((i * chunksize)) +
                  " with an limit of " + str(chunksize) + ".")
            try:
                # start runtime measurment
                i_start = time.time()
                # sql statement
                sql = """\
                select "item_no", "description_1", "description_2", "description_3" FROM "products" order by "item_no" offset ? limit ?"""
                # store into dataframe
                df = pds.read_sql_query(sql,
                                        conn,
                                        params=[(i * chunksize), chunksize])
                # get first and last value from dataframe
                head = df["item_no"].iloc[0]
                tail = df["item_no"].iloc[-1]
                # store query
                # Appending data frames via pandas.append() suddenly becomes slower by a factor of 10 from approx. 500,000 data records per 4 columns.
                appended_df.append(df)
                # stop runtime measurement
                i_end = time.time()
                # print result
                print(
                    str(i + 1) + " out of " + str(chunks) + " chunks in " +
                    "{:5.3f}s".format(i_end - i_start) + " processed.")
            except:
                # collect error information
                print(
                    "\nChunk " + str(i + 1) +
                    " cannot be selected due to an error. Reduce chunk size from "
                    + str(chunksize) + " to " + str(chksz_lvl2) +
                    ". Entering level 2.\nFirst working item_no of last working chunk "
                    + str(head) +
                    "\nLast working item_no of last working chunk " +
                    str(tail))
                ### 2 ### Successively reduce the chunks to narrow down and isolate errors.
                for j in range(0, chunk_divisor):
                     
                  and so on...
                             ...
                                ...
                                   ...
# Merge chunks
print("\nNote: Chunkzize = from row_no to row_no. Could be 1,2,3,4 = range of 4 or compleley different. Ex. 2,45,99,1002 = range of 4.\n\nConcatinate chunks.")
products_df = pds.DataFrame()
products_df = pds.concat(appended_df, ignore_index=True)
print("Done. " + str(error_counter) +
" rows had to be skipped. Details can be found in the full error log.")

conn.close()

我刚刚注意到 python 脚本已经按预期 运行ning。像 Dask 这样的其他框架没有任何机会改进这一点。在我的例子中,我想获取一些数据的源 Postgres 数据库(在我的例子中是 9.x)有一个关于同时使用 limitorder by 的问题在查询巨大的 tables.

期间

我无法直接检测到这一点,因为我的 SQL 查询工具 (DBeaver) 即使您想查询完整的 table 也只加载一个子集来显示。因此,结果是一个虚假的朋友。如果您想通过订购正确地检查 运行 一个带有相当大 offsetlimit 的短 select。

偏移量约为。 500 k 条记录,在我的例子中,只有一条记录到 select 的时间大约需要 10 秒。

解决方案是在我的嵌入式 SQL 脚本中的“尝试”部分删除 order by