将元组列表映射到 python 中的多处理池对象
mapping a list of tuples to the multiprocessing pool object in python
我在使用以下格式的代码时遇到问题,并假设错误与我尝试访问每个元组中的元素的方式有关。
from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os
def chunker(fob):
chunkbegin=0
filesize=os.stat(fob.name).st_size
while chunkbegin < filesize:
chunkend=chunkbegin+100000
fob.seek(chunkend)
fob.readline()
chunkend=fob.tell()
yield (chunkbegin,chunkend)
chunkbegin=chunkend
def run(tup, fob):
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))
准确的错误是:
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>
1) 所以当map函数将元组映射到函数时;我假设这些元素应该像普通元组一样被调用? IE只有一个索引?
2) 我传递给函数的元素块 运行: 是一个元组列表:
chunks=[(0,100000),(100000,200000)...] 由生成器分块器创建。
谢谢。
map
方法接受一个可迭代的参数。可迭代对象的每个元素都传递给 run
的一个实例。由于您的可迭代对象是元组 (chunks, fob)
,这将执行 运行 两个任务,在一个任务中调用 run(chunks)
,在另一个任务中调用 run(fob)
。
我想你想做的是 运行 chunks
中每个 chunk
一个任务,调用 run(chunk, fob)
.
因此,首先,您需要一个每个块产生 (chunk, fob)
一次的可迭代对象,例如 ((chunk, fob) for chunk in chunks)
.
但这仍然行不通,因为它将使用一个参数调用 run
,二元组 (chunk, fob)
,而不是两个参数。您可以通过重写或包装 run
以采用单个 2 元组而不是两个单独的参数来解决此问题,或者您可以只使用 starmap
而不是 map
,它会为您包装.
但这仍然行不通。您正试图在进程之间传递一个打开的文件 object,而 multiprocessing
不能这样做。
由于您使用的是 fork
方法,有时您可以从 parent 继承文件 object 而不是传递它,但细节很复杂,你真的需要阅读 multiprocessing
的 Programming guidelines 并了解文件描述符继承在 Unix 上的工作原理。
由于您希望每个 child 都有自己独立的文件 object 副本,因此它们都可以 seek
在其中,最简单的解决方案是只传递文件名并让他们 open
自己:
def run(tup, path):
with open(path) as fob:
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob = open(infile)
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
与此同时,既然我们确定我们不依赖于 fork
行为,那么编写代码以使用任何启动方法可能是个好主意。这意味着将 top-level 代码放入 __main__
块中。而且,在我们处理它的同时,让我们确保在完成后关闭文件:
# imports
# function definitions
if __name__ == '__main__':
infile = argv[1]
pool = Pool(15)
with open(infile) as fob:
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
您的代码中可能还有其他错误,但我认为这会耗尽 multiprocessing
个错误。
我在使用以下格式的代码时遇到问题,并假设错误与我尝试访问每个元组中的元素的方式有关。
from numberer import numberify
from sys import argv
infile=argv[1]
from multiprocessing import Pool
pool=Pool(15)
import os
def chunker(fob):
chunkbegin=0
filesize=os.stat(fob.name).st_size
while chunkbegin < filesize:
chunkend=chunkbegin+100000
fob.seek(chunkend)
fob.readline()
chunkend=fob.tell()
yield (chunkbegin,chunkend)
chunkbegin=chunkend
def run(tup, fob):
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob=open(infile)
chunks=[x for x in chunker(fob)]
pool.map(run, (chunks, fob))
准确的错误是:
Process ForkPoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
task = get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'run' on <module '__main__' from 'pretonumber.py'>
1) 所以当map函数将元组映射到函数时;我假设这些元素应该像普通元组一样被调用? IE只有一个索引?
2) 我传递给函数的元素块 运行: 是一个元组列表: chunks=[(0,100000),(100000,200000)...] 由生成器分块器创建。
谢谢。
map
方法接受一个可迭代的参数。可迭代对象的每个元素都传递给 run
的一个实例。由于您的可迭代对象是元组 (chunks, fob)
,这将执行 运行 两个任务,在一个任务中调用 run(chunks)
,在另一个任务中调用 run(fob)
。
我想你想做的是 运行 chunks
中每个 chunk
一个任务,调用 run(chunk, fob)
.
因此,首先,您需要一个每个块产生 (chunk, fob)
一次的可迭代对象,例如 ((chunk, fob) for chunk in chunks)
.
但这仍然行不通,因为它将使用一个参数调用 run
,二元组 (chunk, fob)
,而不是两个参数。您可以通过重写或包装 run
以采用单个 2 元组而不是两个单独的参数来解决此问题,或者您可以只使用 starmap
而不是 map
,它会为您包装.
但这仍然行不通。您正试图在进程之间传递一个打开的文件 object,而 multiprocessing
不能这样做。
由于您使用的是 fork
方法,有时您可以从 parent 继承文件 object 而不是传递它,但细节很复杂,你真的需要阅读 multiprocessing
的 Programming guidelines 并了解文件描述符继承在 Unix 上的工作原理。
由于您希望每个 child 都有自己独立的文件 object 副本,因此它们都可以 seek
在其中,最简单的解决方案是只传递文件名并让他们 open
自己:
def run(tup, path):
with open(path) as fob:
fob.seek(tup[0])
length=int(tup[1])-int(tup[0])
lines=fob.readlines(length)
for line in lines:
print(line)
fob = open(infile)
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
与此同时,既然我们确定我们不依赖于 fork
行为,那么编写代码以使用任何启动方法可能是个好主意。这意味着将 top-level 代码放入 __main__
块中。而且,在我们处理它的同时,让我们确保在完成后关闭文件:
# imports
# function definitions
if __name__ == '__main__':
infile = argv[1]
pool = Pool(15)
with open(infile) as fob:
chunks = [x for x in chunker(fob)]
args = ((chunk, infile) for chunk in chunks)
pool.starmap(run, args)
您的代码中可能还有其他错误,但我认为这会耗尽 multiprocessing
个错误。