多处理:将文件句柄传递给进程

Multiprocessing: Pass file handle to Process

我的程序生成多个进程来执行一些耗时的计算。然后将结果收集到队列中,写入程序将它们写入输出文件。

下面是我的代码的简化版本,应该可以说明我的问题。如果我在Writerclass中注释掉flush语句,test.out在程序结束时为空

这里到底发生了什么? test.out 是不是没有正常关闭?认为将文件句柄传递给自主进程应该首先起作用是否天真?

from multiprocessing import JoinableQueue, Process

def main():
    queue = JoinableQueue()
    queue.put("hello world!")

    with open("test.out", "w") as outhandle:
        wproc = Writer(queue, outhandle)
        wproc.start()
        queue.join()

    with open("test.out") as handle:
        for line in handle:
            print(line.strip())

class Writer(Process):

    def __init__(self, queue, handle):
        Process.__init__(self)
        self.daemon = True
        self.queue = queue
        self.handle = handle

    def run(self):
        while True:
            msg = self.queue.get()
            print(msg, file=self.handle)
            #self.handle.flush()
            self.queue.task_done()

if __name__ == '__main__':
    main()

编写器是一个单独的进程。它写入文件的数据可能被缓冲,并且因为进程保持 运行,它不知道它应该刷新缓冲区(将其写入文件)。手动冲洗是正确的做法。

通常,当您退出 with 块时,文件会被关闭,这会刷新缓冲区。但是父进程对其子进程的缓冲区一无所知,因此子进程必须刷新自己的缓冲区(关闭文件也应该有效——这不会为父进程关闭文件,至少在 Unix 系统上是这样)。

此外,从多处理 (https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool) 中检查池 class - 它可能会为您节省一些工作。

我在将处理过的合并数据集的输出直接写入文件时遇到了同样的问题。

这是通过首先将合并的结果收集到列表中然后最后写入文件来解决的。

这主要是由于硬盘无法写入处理器处理的速度,缓冲内容丢失或池数据顺序不正确。

最好的办法是先将合并的输出分配到内存位置或变量(字符串或列表),然后写入文件,这样事情就会得到解决。

舔得好!