Python:'before' 和 'after' 用于多处理工作线程

Python: 'before' and 'after' for multiprocessing workers

更新:这里有一个更具体的例子

假设我想从一组相当大的文件中编译一些统计数据: 我可以制作一个生成器 (line for line in fileinput.input(files)) 和一些处理器:

from collections import defaultdict 
scores = defaultdict(int) 

def process(line): 
    if 'Result' in line: 
        res = line.split('\"')[1].split('-')[0]
        scores[res] += 1

问题是当到达 multiprocessing.Pool 时如何处理这个问题。

当然可以定义 multiprocessing.sharedctypes 以及自定义 struct 而不是 defaultdict 但这看起来相当痛苦。另一方面,我想不出一种 pythonic 方法来在进程之前实例化某些东西,或者在生成器 运行 输出到主线程之后 return 某些东西。

所以你基本上创建了一个直方图。这可以很容易地并行化,因为直方图可以毫不复杂地合并。有人可能想说这个问题是平凡可并行的或 "embarrassingly parallel"。即不用担心工人之间的沟通问题。

只需将您的数据集分成多个块,让您的工作人员独立处理这些块,收集每个工作人员的直方图,然后合并直方图。

在实践中,让每个工作人员 process/read 拥有自己的文件可以解决这个问题。也就是说,"task" 可以是文件名。您不应该开始酸洗文件内容并通过管道在进程之间发送它们。让每个工作进程直接从文件中检索批量数据。否则你的架构会花费太多时间在进程间通信上,而不是做一些真正的工作。

你需要一个例子吗?或者你能自己弄明白吗?

编辑:示例实现

我有许多数据文件的文件名格式如下:data0.txtdata1.txt、...

示例内容:

wolf
wolf
cat
blume
eisenbahn

目标是创建数据文件中包含的单词的直方图。这是代码:

from multiprocessing import Pool
from collections import Counter
import glob


def build_histogram(filepath):
    """This function is run by a worker process.
    The `filepath` argument is communicated to the worker
    through a pipe. The return value of this function is
    communicated to the manager through a pipe.
    """
    hist = Counter()
    with open(filepath) as f:
        for line in f:
            hist[line.strip()] += 1
    return hist


def main():
    """This function runs in the manager (main) process."""

    # Collect paths to data files.
    datafile_paths = glob.glob("data*.txt")

    # Create a pool of worker processes and distribute work.
    # The input to worker processes (function argument) as well
    # as the output by worker processes is transmitted through
    # pipes, behind the scenes.
    pool = Pool(processes=3)
    histograms = pool.map(build_histogram, datafile_paths)

    # Properly shut down the pool of worker processes, and
    # wait until all of them have finished.
    pool.close()
    pool.join()

    # Merge sub-histograms. Do not create too many intermediate
    # objects: update the first sub-histogram with the others.
    # Relevant docs: collections.Counter.update
    merged_hist = histograms[0]
    for h in histograms[1:]:
        merged_hist.update(h)

    for word, count in merged_hist.items():
        print "%s: %s" % (word, count)


if __name__ == "__main__":
    main()

测试输出:

python countwords.py
eisenbahn: 12
auto: 6
cat: 1
katze: 10
stadt: 1
wolf: 3
zug: 4
blume: 5
herbert: 14
destruction: 4

我不得不修改原来的 pool.py(问题是 worker 被定义为一个没有任何继承的方法)来得到我想要的东西,但这还不错,可能比完全编写一个新池要好.

class worker(object):
    def __init__(self, inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
           wrap_exception=False, finalizer=None, finargs=()): 
        assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
        put = outqueue.put
        get = inqueue.get
        self.completed = 0
        if hasattr(inqueue, '_writer'):
            inqueue._writer.close()
            outqueue._reader.close()
        if initializer is not None:
            initializer(self, *initargs)

        def run(self): 
            while maxtasks is None or (maxtasks and self.completed < maxtasks):
                try:
                    task = get()
                except (EOFError, OSError):
                    util.debug('worker got EOFError or OSError -- exiting')
                    break

                if task is None:
                    util.debug('worker got sentinel -- exiting')
                    break

                job, i, func, args, kwds = task
                try:
                    result = (True, func(*args, **kwds))
                except Exception as e:
                    if wrap_exception:
                        e = ExceptionWithTraceback(e, e.__traceback__)
                    result = (False, e)
                try:
                    put((job, i, result))
                except Exception as e:
                    wrapped = MaybeEncodingError(e, result[1])
                    util.debug("Possible encoding error while sending result: %s" % (
                        wrapped))
                    put((job, i, (False, wrapped)))
                self.completed += 1
            if finalizer:
                finalizer(self, *finargs)
            util.debug('worker exiting after %d tasks' % self.completed)
        run(self)