Python:'before' 和 'after' 用于多处理工作线程
Python: 'before' and 'after' for multiprocessing workers
更新:这里有一个更具体的例子
假设我想从一组相当大的文件中编译一些统计数据:
我可以制作一个生成器 (line for line in fileinput.input(files))
和一些处理器:
from collections import defaultdict
scores = defaultdict(int)
def process(line):
if 'Result' in line:
res = line.split('\"')[1].split('-')[0]
scores[res] += 1
问题是当到达 multiprocessing.Pool
时如何处理这个问题。
当然可以定义 multiprocessing.sharedctypes
以及自定义 struct
而不是 defaultdict
但这看起来相当痛苦。另一方面,我想不出一种 pythonic 方法来在进程之前实例化某些东西,或者在生成器 运行 输出到主线程之后 return 某些东西。
所以你基本上创建了一个直方图。这可以很容易地并行化,因为直方图可以毫不复杂地合并。有人可能想说这个问题是平凡可并行的或 "embarrassingly parallel"。即不用担心工人之间的沟通问题。
只需将您的数据集分成多个块,让您的工作人员独立处理这些块,收集每个工作人员的直方图,然后合并直方图。
在实践中,让每个工作人员 process/read 拥有自己的文件可以解决这个问题。也就是说,"task" 可以是文件名。您不应该开始酸洗文件内容并通过管道在进程之间发送它们。让每个工作进程直接从文件中检索批量数据。否则你的架构会花费太多时间在进程间通信上,而不是做一些真正的工作。
你需要一个例子吗?或者你能自己弄明白吗?
编辑:示例实现
我有许多数据文件的文件名格式如下:data0.txt
、data1.txt
、...
示例内容:
wolf
wolf
cat
blume
eisenbahn
目标是创建数据文件中包含的单词的直方图。这是代码:
from multiprocessing import Pool
from collections import Counter
import glob
def build_histogram(filepath):
"""This function is run by a worker process.
The `filepath` argument is communicated to the worker
through a pipe. The return value of this function is
communicated to the manager through a pipe.
"""
hist = Counter()
with open(filepath) as f:
for line in f:
hist[line.strip()] += 1
return hist
def main():
"""This function runs in the manager (main) process."""
# Collect paths to data files.
datafile_paths = glob.glob("data*.txt")
# Create a pool of worker processes and distribute work.
# The input to worker processes (function argument) as well
# as the output by worker processes is transmitted through
# pipes, behind the scenes.
pool = Pool(processes=3)
histograms = pool.map(build_histogram, datafile_paths)
# Properly shut down the pool of worker processes, and
# wait until all of them have finished.
pool.close()
pool.join()
# Merge sub-histograms. Do not create too many intermediate
# objects: update the first sub-histogram with the others.
# Relevant docs: collections.Counter.update
merged_hist = histograms[0]
for h in histograms[1:]:
merged_hist.update(h)
for word, count in merged_hist.items():
print "%s: %s" % (word, count)
if __name__ == "__main__":
main()
测试输出:
python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4
我不得不修改原来的 pool.py(问题是 worker 被定义为一个没有任何继承的方法)来得到我想要的东西,但这还不错,可能比完全编写一个新池要好.
class worker(object):
def __init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False, finalizer=None, finargs=()):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
self.completed = 0
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(self, *initargs)
def run(self):
while maxtasks is None or (maxtasks and self.completed < maxtasks):
try:
task = get()
except (EOFError, OSError):
util.debug('worker got EOFError or OSError -- exiting')
break
if task is None:
util.debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
self.completed += 1
if finalizer:
finalizer(self, *finargs)
util.debug('worker exiting after %d tasks' % self.completed)
run(self)
更新:这里有一个更具体的例子
假设我想从一组相当大的文件中编译一些统计数据:
我可以制作一个生成器 (line for line in fileinput.input(files))
和一些处理器:
from collections import defaultdict
scores = defaultdict(int)
def process(line):
if 'Result' in line:
res = line.split('\"')[1].split('-')[0]
scores[res] += 1
问题是当到达 multiprocessing.Pool
时如何处理这个问题。
当然可以定义 multiprocessing.sharedctypes
以及自定义 struct
而不是 defaultdict
但这看起来相当痛苦。另一方面,我想不出一种 pythonic 方法来在进程之前实例化某些东西,或者在生成器 运行 输出到主线程之后 return 某些东西。
所以你基本上创建了一个直方图。这可以很容易地并行化,因为直方图可以毫不复杂地合并。有人可能想说这个问题是平凡可并行的或 "embarrassingly parallel"。即不用担心工人之间的沟通问题。
只需将您的数据集分成多个块,让您的工作人员独立处理这些块,收集每个工作人员的直方图,然后合并直方图。
在实践中,让每个工作人员 process/read 拥有自己的文件可以解决这个问题。也就是说,"task" 可以是文件名。您不应该开始酸洗文件内容并通过管道在进程之间发送它们。让每个工作进程直接从文件中检索批量数据。否则你的架构会花费太多时间在进程间通信上,而不是做一些真正的工作。
你需要一个例子吗?或者你能自己弄明白吗?
编辑:示例实现
我有许多数据文件的文件名格式如下:data0.txt
、data1.txt
、...
示例内容:
wolf
wolf
cat
blume
eisenbahn
目标是创建数据文件中包含的单词的直方图。这是代码:
from multiprocessing import Pool
from collections import Counter
import glob
def build_histogram(filepath):
"""This function is run by a worker process.
The `filepath` argument is communicated to the worker
through a pipe. The return value of this function is
communicated to the manager through a pipe.
"""
hist = Counter()
with open(filepath) as f:
for line in f:
hist[line.strip()] += 1
return hist
def main():
"""This function runs in the manager (main) process."""
# Collect paths to data files.
datafile_paths = glob.glob("data*.txt")
# Create a pool of worker processes and distribute work.
# The input to worker processes (function argument) as well
# as the output by worker processes is transmitted through
# pipes, behind the scenes.
pool = Pool(processes=3)
histograms = pool.map(build_histogram, datafile_paths)
# Properly shut down the pool of worker processes, and
# wait until all of them have finished.
pool.close()
pool.join()
# Merge sub-histograms. Do not create too many intermediate
# objects: update the first sub-histogram with the others.
# Relevant docs: collections.Counter.update
merged_hist = histograms[0]
for h in histograms[1:]:
merged_hist.update(h)
for word, count in merged_hist.items():
print "%s: %s" % (word, count)
if __name__ == "__main__":
main()
测试输出:
python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4
我不得不修改原来的 pool.py(问题是 worker 被定义为一个没有任何继承的方法)来得到我想要的东西,但这还不错,可能比完全编写一个新池要好.
class worker(object):
def __init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False, finalizer=None, finargs=()):
assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
put = outqueue.put
get = inqueue.get
self.completed = 0
if hasattr(inqueue, '_writer'):
inqueue._writer.close()
outqueue._reader.close()
if initializer is not None:
initializer(self, *initargs)
def run(self):
while maxtasks is None or (maxtasks and self.completed < maxtasks):
try:
task = get()
except (EOFError, OSError):
util.debug('worker got EOFError or OSError -- exiting')
break
if task is None:
util.debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
self.completed += 1
if finalizer:
finalizer(self, *finargs)
util.debug('worker exiting after %d tasks' % self.completed)
run(self)