python 中使用池 class 的文本文件记录计数

Text file record count using pool class in python

我有我的程序,它列出并读取目录中的所有文件,并同时计算文件中存在的记录总数。

当我运行下面的代码时,我得到了一些工作线程名称列表,其中计数以块的形式出现,因为来自多个文件的记录计数也是并行的。

import multiprocessing as mp
import time
import os
path = '/home/vaibhav/Desktop/Input_python'

def process_line(f):
    print(mp.current_process())
    #print("process id = " , os.getpid(f))
    print(sum(1 for line in f))

for filename in os.listdir(path):
    print(filename)

    if __name__ == "__main__":

        with open('/home/vaibhav/Desktop/Input_python/'+ filename, "r+") as source_file:
            # chunk the work into batches

            p = mp.Pool()
            results = p.map(process_line, source_file)

start_time = time.time()
print("My program took", time.time() - start_time, "to run")

当前输出

<ForkProcess(ForkPoolWorker-54, started daemon)>
73
<ForkProcess(ForkPoolWorker-55, started daemon)>
<ForkProcess(ForkPoolWorker-56, started daemon)>
<ForkProcess(ForkPoolWorker-53, started daemon)>
73
1
<ForkProcess(ForkPoolWorker-53, started daemon)>
79
<ForkProcess(ForkPoolWorker-54, started daemon)>
<ForkProcess(ForkPoolWorker-56, started daemon)>
<ForkProcess(ForkPoolWorker-55, started daemon)>
79
77
77

有没有办法让我可以得到像

这样的文件的总记录数
File1.Txt Total_Recordcount
...
Filen.txt  Total_Recordcount

更新 我得到了解决方案并将答案粘贴在评论部分。

文本文件中的计数行不应 CPU 绑定,因此它不适合线程化。您可能想使用线程池来处理多个独立文件,但对于单个文件,这里有一种计算行数的方法应该非常快:

import pandas as pd
data = pd.read_table(source_file, dtype='S1', header=None, usecols=[0])
count = len(data)

这样做是将第一个字符(S1)解析成一个DataFrame,然后检查长度。解析器是用 C 实现的,因此不需要慢 Python 循环。这应该提供接近最佳的速度,仅受您的磁盘子系统限制。

这完全回避了原来的问题,因为现在每个文件只有一个计数。

早些时候我正在读取文件并一次为单个文件生成多个进程,这导致了文件块的记录计数。

但现在我改变了我的方法,目前我将一个文件列表作为可迭代传递给 pool.map() 函数,该函数为列表中的所有不同文件释放多个进程并给我更好的结果以 运行 时间表示。这是我参考的 ,下面是粘贴和更正的代码。

import multiprocessing  as mp
from multiprocessing import Pool
import os
import time
folder = '/home/vaibhav/Desktop/Input_python'

fnames = (name for name in os.listdir(folder))
def file_wc(fname):
    with open('/home/vaibhav/Desktop/Input_python/'+ fname) as f:
        count = sum(1 for line in f)
    return (fname,count)   
pool = Pool()    
print(dict(pool.map(file_wc, list(fnames))))
pool.close()
pool.join()
start_time = time.time()
print("My program took", time.time() - start_time, "to run")