将元组列表映射到 python 中的多处理池对象

mapping a list of tuples to the multiprocessing pool object in python

我在使用以下格式的代码时遇到问题,并假设错误与我尝试访问每个元组中的元素的方式有关。

from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os

def chunker(fob):
    chunkbegin=0
    filesize=os.stat(fob.name).st_size
    while chunkbegin < filesize:
        chunkend=chunkbegin+100000
        fob.seek(chunkend)
        fob.readline()
        chunkend=fob.tell()
        yield (chunkbegin,chunkend)
        chunkbegin=chunkend

def run(tup, fob):
    fob.seek(tup[0])
    length=int(tup[1])-int(tup[0])
    lines=fob.readlines(length)
    for line in lines:
        print(line)

fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))

准确的错误是:

Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>

1) 所以当map函数将元组映射到函数时;我假设这些元素应该像普通元组一样被调用? IE只有一个索引?

2) 我传递给函数的元素块 运行: 是一个元组列表: chunks=[(0,100000),(100000,200000)...] 由生成器分块器创建。

谢谢。

map 方法接受一个可迭代的参数。可迭代对象的每个元素都传递给 run 的一个实例。由于您的可迭代对象是元组 (chunks, fob),这将执行 运行 两个任务,在一个任务中调用 run(chunks),在另一个任务中调用 run(fob)


我想你想做的是 运行 chunks 中每个 chunk 一个任务,调用 run(chunk, fob).

因此,首先,您需要一个每个块产生 (chunk, fob) 一次的可迭代对象,例如 ((chunk, fob) for chunk in chunks).


但这仍然行不通,因为它将使用一个参数调用 run,二元组 (chunk, fob),而不是两个参数。您可以通过重写或包装 run 以采用单个 2 元组而不是两个单独的参数来解决此问题,或者您可以只使用 starmap 而不是 map,它会为您包装.


但这仍然行不通。您正试图在进程之间传递一个打开的文件 object,而 multiprocessing 不能这样做。

由于您使用的是 fork 方法,有时您可以从 parent 继承文件 object 而不是传递它,但细节很复杂,你真的需要阅读 multiprocessingProgramming guidelines 并了解文件描述符继承在 Unix 上的工作原理。

由于您希望每个 child 都有自己独立的文件 object 副本,因此它们都可以 seek 在其中,最简单的解决方案是只传递文件名并让他们 open 自己:

def run(tup, path):
    with open(path) as fob:
        fob.seek(tup[0])
        length=int(tup[1])-int(tup[0])
        lines=fob.readlines(length)
        for line in lines:
            print(line)

fob = open(infile)
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)

与此同时,既然我们确定我们不依赖于 fork 行为,那么编写代码以使用任何启动方法可能是个好主意。这意味着将 top-level 代码放入 __main__ 块中。而且,在我们处理它的同时,让我们确保在完成后关闭文件:

# imports
# function definitions
if __name__ == '__main__':
    infile = argv[1]
    pool = Pool(15)
    with open(infile) as fob:
        chunks = [x for x in chunker(fob)]
    args = ((chunk, infile) for chunk in chunks)
    pool.starmap(run, args)

您的代码中可能还有其他错误,但我认为这会耗尽 multiprocessing 个错误。