在 python 中高效处理约 5000 万个记录文件

Efficiently processing ~50 million record file in python

我们有一个包含大约 4600 万条 CSV 格式记录的文件。每条记录大约有 18 个字段,其中一个是 64 字节的 ID。我们还有另一个文件,其中包含大约 167K 个唯一 ID。 ID对应的记录需要被抽出。因此,我们编写了一个 python 程序,将 167K ID 读入数组并处理 4600 万条记录文件,检查这些记录中是否存在 ID。这是代码片段:

import csv
...
csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

在一小组数据上测试了程序,这里是处理时间:

lines: 117929 process time: 236.388447046 sec
lines: 145390 process time: 277.075321913 sec

我们已经在美国东部时间昨晚 ~3:00am 启动了 运行 4600 万条记录文件(大约 13 GB 大小)的程序,现在是美国东部时间上午 10 点左右,它仍在处理中!

问题:

  1. 是否有更好的方法来处理这些记录以缩短处理时间?
  2. python是正确的选择吗? awk 或其他工具会有帮助吗?
  3. 我猜测在以下语句中对 167K 数组进行 64 字节 ID 查找是罪魁祸首:
    if fieldAry[CUSTOMER_ID] not in idArray:

有更好的选择吗?

谢谢!

更新:这是在带有 EBS 附加卷的 EC2 实例上处理的。

稍微加快速度的最简单方法是使用某些分布式解决方案并行处理线路。最简单的方法之一是使用 multiprocessing.Pool。你应该这样做(语法未检查):

from multiprocessing import Pool

p = Pool(processes=4)
p.map(process_row, csvReadHandler)

尽管如此,python 并不是执行这种批处理的最佳语言(主要是因为写入磁盘非常慢)。最好将所有磁盘写入管理(缓冲、排队等)留给 linux 内核,因此使用 bash 解决方案会更好。最有效的方法是将输入文件分成块,然后简单地执行反向 grep 来过滤 ID。

for file in $list_of_splitted_files; then
  cat $file | grep -v (id1|id2|id3|...) > $file.out
done;

如果您之后需要简单地合并:

for file in $(ls *.out); then
  cat $file >> final_results.csv
done

注意事项:

  • 不知道对所有 ID 执行一次 grep 是否 more/less 比遍历所有 id 并执行单个 id 更有效 grep.
  • 编写并行解决方案时尝试read/write不同 文件以最小化 I/O 瓶颈(所有线程都试图写入 同一个文件)在所有语言中。
  • 在代码中为每个处理部分设置计时器。这样你就会看到哪个部分浪费了更多时间。我 真的推荐这个 因为我有一个类似的程序要写,我认为处理部分是瓶颈(类似于你与 ids 向量的比较)但实际上它是 I/O 这拖累了所有的执行。

应该必须使用set而不是list for 循环之前:

idArray = set(idArray)

csvReadHandler = csv.reader(inputFile, delimiter=chr(1))
csvWriteHandler = csv.writer(outputFile, delimiter=chr(1), lineterminator='\n')
for fieldAry in csvReadHandler:
    lineCounts['orig'] += 1
    if fieldAry[CUSTOMER_ID] not in idArray:
        csvWriteHandler.writerow(fieldAry)
        lineCounts['mod'] += 1

并看到令人难以置信的加速;你正在使用 的不必要的处理时间只是因为你选择了错误的数据结构。


in 运算符 set 具有 O(1) 时间复杂度,而 O(n) list 的时间复杂度。这可能听起来像 "not a big deal" 但实际上 它是脚本中的瓶颈 。尽管 set 对于 O 会有更高的常量。因此,您的代码在单个 in 操作上使用的时间比必要的多 30000 次。如果在最佳版本中它需要 30 秒,现在您仅在这一行上就花费了 10 天。

看下面的测试:我生成了 100 万个 ID,然后将 10000 个放到另一个列表中 - to_remove。然后我像你一样做一个 for 循环,对每条记录做 in 操作:

import random
import timeit

all_ids = [random.randint(1, 2**63) for i in range(1000000)]
to_remove = all_ids[:10000]
random.shuffle(to_remove)
random.shuffle(all_ids)


def test_set():
    to_remove_set = set(to_remove)
    for i in all_ids:
        if i in to_remove_set:
            pass

def test_list():
    for i in all_ids:
        if i in to_remove:
            pass


print('starting')
print('testing list', timeit.timeit(test_list, number=1))
print('testing set', timeit.timeit(test_set, number=1))

结果:

testing list 227.91903045598883
testing set 0.14897623099386692

set版本耗时149毫秒; list 版本需要 228 秒。现在这些都是小数字:在你的例子中,你有 5000 万条输入记录,而我有 100 万条;因此,您需要将 testing set 时间乘以 50:对于您的数据集,大约需要 7.5 秒。

另一方面,列表版本需要将该时间乘以 50 * 17 - 不仅输入记录多 50 倍,而且要匹配的记录多 17 倍。因此我们得到 227 * 50 * 17 = 192950.

所以你的算法花了 2.2 天的时间来做一些使用正确的数据结构可以在 7.5 秒内完成的事情。当然,这并不意味着您可以在 7.5 秒 内扫描完整个 50 GB 文档,但它可能不会超过 2.2 天。所以我们从:

             2 days                           2.2 days 
 |reading and writing the files||------- doing id in list ------|

类似于

             2 days            7.5 seconds (doing id in set)
 |reading and writing the files||

免责声明: 不要在没有解释原因的情况下投反对票,因为 OP 没有包括他的整个代码库或 hardware/infrastructure 设计。但是,如果我在我的代码或逻辑中犯了严重错误,请解释它们并相应地投反对票。

让我们从定义您将遇到的瓶颈开始(有些很明显,有些则没有)。

  • 硬盘驱动器 - 它们很慢并且不会缓存大量数据
  • 多次重新读取相同的数据
  • 内存,你不能存储 13GB 的文件,或者你可以,这是一个选项?

要解决这些问题,你可以走多条路。

  1. 一种明显有益的方法是将大数据读入数据库(例如 postgresql 或 mariadb)。但我认为目前这根本不可能。

关于 CSV reader,它们很好,但可能效率不高。
既然你无论如何都要通读这两个文件,我会考虑以下几点:

  1. 逐行读取13GB的文件,不检查key/value是否存在,将ID存入字典。 (为什么?因为检查该值是否存在比仅仅覆盖它要慢,而且字典还有一个额外的好处,即键是唯一的,因此重复项将被淘汰) 将其添加到 set() 中,正如许多其他人所描述的那样。

    然后逐行读取较小的文件并检查您的 dictset 是否包含 ID.

dict() 对比 set() 对比 list()

下面是set()list()dict()三种数据类型的比较:
使用的代码:test.py

(11.719028949737549, 999950, 'Using dict() for inserts')
(1.8462610244750977, 'dict() time spent looking through the data')

(11.793760061264038, 999961, 'Using set() for inserts')
(7.019757986068726, 'set() time spent looking through the data')

(15min+, 'Using list()')  # Honestly, I never let it run to it's finish.. It's to slow.

如您所见,dictset 稍微快一点,而 list() 完全落后(请参阅 对原因的描述)。我应该指出我的数据有点偏差,因为它是速度差异的快速而肮脏的演示,但总体思路应该仍然存在。

使用 dict() 作为解决方案

因此,如果您无权访问源数据的数据库版本,并且您需要使用 Python,请使用以下代码:

delimiter = b'\x01'
big_source = {}
with open('13_gig_source.log', 'rb') as fh:
    for line in fh:
        csv = line.split(delimiter)
        big_source[csv[4]] = None # 5:th column, change to match CUSTOMER_ID

output = open('result.log', 'wb')
with open('smaller_file.log', 'rb') as fh:
    for line in fh:
        csv = line.split(delimiter)
        if csv[4] in big_source:
            output.write(csv[4] + b'\n')
output.close()

因为我不知道数据存在于哪一列,所以我没有优化 split()
例如,如果它是您要获取的最后一列,请改为执行 line.rsplit(delimiter, 1)[-1]。或者,如果它是 3:d 列,则执行 line.split(delimiter, 3)[2],因为它将中止在 split() 函数中查找 delimiter 的下一个位置的过程。

使用 linux 工具

是的,某些工具可能更适合此操作,例如 awk,因为它是用 C 语言编写的特定工具,用于执行非常特定的任务。即使 Python 基于 C,它在 C 代码之上仍然有很多抽象层,并且在大多数情况下会比为特定任务编写的对应 C 工具慢。

现在我没有数据来测试它,我也不是 PRO With Nix-Commands 或简称 PWN-C。所以我会让其他人给你举个例子,但我发现了这个:

  • merge two csv files according to matching rows and add new columns in linux

这可能会有帮助。

我觉得最好用数据库来解决这个问题 首先创建一个数据库,如 MySql 或其他任何东西 然后将文件中的数据写入 2 table 最后使用一个简单的 sql 查询 select 行 像这样的东西: select * 来自 table1 其中 id in(select 来自 table2 的 id)