数据处理中多个文件的实时处理(PythonMultiprocessing)

Real-time handling of multiple files in data processing (Python Multiprocessing)

程序正在监控一个文件夹received_dir并实时处理接收到的文件。处理完文件后,将原文件删除,以节省磁盘空间space。 我正在尝试使用 Python multiprocessingPool。 我想检查当前方法是否存在任何技术缺陷。

当前代码中的一个问题是程序应该等到队列中的所有 20 个文件都被处理后再开始下一轮,因此在某些情况下(即各种文件大小)可能效率低下。

from multiprocessing import Pool
import os
import os.path

Parse_OUT="/opt/out/"
Receive_Dir="/opt/receive/"

def parser(infile):
    out_dir=date_of(filename)
    if not os.path.exists(out_dir):
        os.mkdir(out_dir)

    fout=gzip.open(out_dir+'/'+filename+'csv.gz','wb')
    with gzip.open(infile) as fin:
        for line in fin:
            data=line.split(',')
            fout.write(data)
    fout.close()
    os.remove(infile)

if __name__ == '__main__':
    pool=Pool(20)
    while True:
        targets=glob.glob(Receive_Dir)[:10]
        pool.map(parser, targets)
    pool.close()

我看到几个问题:

  1. if not os.path.exists(out_dir): os.mkdir(out_dir):这是一个竞争条件。如果两个 worker 试图同时创建同一个目录,其中一个将引发异常。不要做 if 条件。只需调用 os.makedirs(out_dir, exist_ok=True)

  2. 不要assemble文件路径加上字符串。只需执行 os.path.join(out_dir, filename+'csv.gz')。这更干净并且故障状态更少

  3. 即使没有新目录出现,也不必在 while True 循环中旋转,您可以使用 Linux 上的 inotify 机制来监视目录的更改。如果确实有任何事情要做,那只会唤醒您的流程。查看pyinotify:https://github.com/seb-m/pyinotify

  4. 既然您提到您对批处理不满意:您可以使用 pool.apply_async 在新操作可用时启动它们。您的主循环不会对结果做任何事情,因此您可以“即发即忘”

  5. 顺便说一下,为什么要启动一个有 20 个工作线程的池,然后一次只启动 10 个目录操作?