Python multiprocessing.Pool: 属性错误

Python multiprocessing.Pool: AttributeError

我在 class 中有一个方法需要在循环中做很多工作,我想将工作分散到我的所有核心上。

我写了下面的代码,如果我使用正常的 map(),它可以工作,但是 pool.map() returns 会出错。

import multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)

class OtherClass:
  def run(sentence, graph):
    return False

class SomeClass:
  def __init__(self):
    self.sentences = [["Some string"]]
    self.graphs = ["string"]

  def some_method(self):
      other = OtherClass()

      def single(params):
          sentences, graph = params
          return [other.run(sentence, graph) for sentence in sentences]

      return list(pool.map(single, zip(self.sentences, self.graphs)))


SomeClass().some_method()

错误 1:

AttributeError: Can't pickle local object 'SomeClass.some_method..single'

为什么不能 pickle single()?我什至尝试将 single() 移动到全局模块范围(不在 class 内 - 使其独立于上下文):

import multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)

class OtherClass:
  def run(sentence, graph):
    return False


def single(params):
    other = OtherClass()
    sentences, graph = params
    return [other.run(sentence, graph) for sentence in sentences]

class SomeClass:
  def __init__(self):
    self.sentences = [["Some string"]]
    self.graphs = ["string"]

  def some_method(self):
      return list(pool.map(single, zip(self.sentences, self.graphs)))


SomeClass().some_method()

我得到以下...

错误 2:

AttributeError: Can't get attribute 'single' on module 'main' from '.../test.py'

错误 1:

AttributeError: Can't pickle local object 'SomeClass.some_method..single'

您通过将嵌套的目标函数 single() 移至顶层自行解决了此错误。

背景:

池需要腌制(序列化)它发送给它的工作进程(IPC). Pickling actually only saves the name of a function and unpickling requires re-importing the function by name. For that to work, the function needs to be defined at the top-level, nested functions won't be importable by the child and already trying to pickle them raises an exception ()的所有东西。


错误 2:

AttributeError: Can't get attribute 'single' on module 'main' from '.../test.py'

您在定义您的函数和类之前启动池,这样子进程就不能继承任何代码。将你的游泳池开始移到底部并用 if __name__ == '__main__':

保护 () 它
import multiprocessing

class OtherClass:
  def run(self, sentence, graph):
    return False


def single(params):
    other = OtherClass()
    sentences, graph = params
    return [other.run(sentence, graph) for sentence in sentences]

class SomeClass:
   def __init__(self):
       self.sentences = [["Some string"]]
       self.graphs = ["string"]

   def some_method(self):
      return list(pool.map(single, zip(self.sentences, self.graphs)))

if __name__ == '__main__':  # <- prevent RuntimeError for 'spawn'
    # and 'forkserver' start_methods
    with multiprocessing.Pool(multiprocessing.cpu_count() - 1) as pool:
        print(SomeClass().some_method())

附录

...I would like to spread the work over all of my cores.

关于 multiprocessing.Pool 如何分块工作的可能有用的背景信息:

我无意中发现了一个非常讨厌的解决方案。它有效,只要你 使用 def 语句。如果您声明函数,您希望在 Pool.map 中使用解决它的函数开头的 global 关键字。但我不会在严肃的应用程序中依赖它

import multiprocessing
pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)

class OtherClass:
  def run(sentence, graph):
    return False

class SomeClass:
  def __init__(self):
    self.sentences = [["Some string"]]
    self.graphs = ["string"]

  def some_method(self):
      global single  # This is ugly, but does the trick XD

      other = OtherClass()

      def single(params):
          sentences, graph = params
          return [other.run(sentence, graph) for sentence in sentences]

      return list(pool.map(single, zip(self.sentences, self.graphs)))


SomeClass().some_method()