数据处理中多个文件的实时处理(PythonMultiprocessing)
Real-time handling of multiple files in data processing (Python Multiprocessing)
程序正在监控一个文件夹received_dir
并实时处理接收到的文件。处理完文件后,将原文件删除,以节省磁盘空间space。
我正在尝试使用 Python
multiprocessing
和 Pool
。
我想检查当前方法是否存在任何技术缺陷。
当前代码中的一个问题是程序应该等到队列中的所有 20 个文件都被处理后再开始下一轮,因此在某些情况下(即各种文件大小)可能效率低下。
from multiprocessing import Pool
import os
import os.path
Parse_OUT="/opt/out/"
Receive_Dir="/opt/receive/"
def parser(infile):
out_dir=date_of(filename)
if not os.path.exists(out_dir):
os.mkdir(out_dir)
fout=gzip.open(out_dir+'/'+filename+'csv.gz','wb')
with gzip.open(infile) as fin:
for line in fin:
data=line.split(',')
fout.write(data)
fout.close()
os.remove(infile)
if __name__ == '__main__':
pool=Pool(20)
while True:
targets=glob.glob(Receive_Dir)[:10]
pool.map(parser, targets)
pool.close()
我看到几个问题:
if not os.path.exists(out_dir): os.mkdir(out_dir)
:这是一个竞争条件。如果两个 worker 试图同时创建同一个目录,其中一个将引发异常。不要做 if 条件。只需调用 os.makedirs(out_dir, exist_ok=True)
不要assemble文件路径加上字符串。只需执行 os.path.join(out_dir, filename+'csv.gz')
。这更干净并且故障状态更少
即使没有新目录出现,也不必在 while True 循环中旋转,您可以使用 Linux 上的 inotify 机制来监视目录的更改。如果确实有任何事情要做,那只会唤醒您的流程。查看pyinotify:https://github.com/seb-m/pyinotify
既然您提到您对批处理不满意:您可以使用 pool.apply_async
在新操作可用时启动它们。您的主循环不会对结果做任何事情,因此您可以“即发即忘”
顺便说一下,为什么要启动一个有 20 个工作线程的池,然后一次只启动 10 个目录操作?
程序正在监控一个文件夹received_dir
并实时处理接收到的文件。处理完文件后,将原文件删除,以节省磁盘空间space。
我正在尝试使用 Python
multiprocessing
和 Pool
。
我想检查当前方法是否存在任何技术缺陷。
当前代码中的一个问题是程序应该等到队列中的所有 20 个文件都被处理后再开始下一轮,因此在某些情况下(即各种文件大小)可能效率低下。
from multiprocessing import Pool
import os
import os.path
Parse_OUT="/opt/out/"
Receive_Dir="/opt/receive/"
def parser(infile):
out_dir=date_of(filename)
if not os.path.exists(out_dir):
os.mkdir(out_dir)
fout=gzip.open(out_dir+'/'+filename+'csv.gz','wb')
with gzip.open(infile) as fin:
for line in fin:
data=line.split(',')
fout.write(data)
fout.close()
os.remove(infile)
if __name__ == '__main__':
pool=Pool(20)
while True:
targets=glob.glob(Receive_Dir)[:10]
pool.map(parser, targets)
pool.close()
我看到几个问题:
if not os.path.exists(out_dir): os.mkdir(out_dir)
:这是一个竞争条件。如果两个 worker 试图同时创建同一个目录,其中一个将引发异常。不要做 if 条件。只需调用os.makedirs(out_dir, exist_ok=True)
不要assemble文件路径加上字符串。只需执行
os.path.join(out_dir, filename+'csv.gz')
。这更干净并且故障状态更少即使没有新目录出现,也不必在 while True 循环中旋转,您可以使用 Linux 上的 inotify 机制来监视目录的更改。如果确实有任何事情要做,那只会唤醒您的流程。查看pyinotify:https://github.com/seb-m/pyinotify
既然您提到您对批处理不满意:您可以使用
pool.apply_async
在新操作可用时启动它们。您的主循环不会对结果做任何事情,因此您可以“即发即忘”顺便说一下,为什么要启动一个有 20 个工作线程的池,然后一次只启动 10 个目录操作?