获取 "Queue objects should only be shared between processes through inheritance" 但我没有使用队列
Getting "Queue objects should only be shared between processes through inheritance" but I'm not using a Queue
我正在尝试使用 ProcessPoolExecutor,但出现错误 "Queue objects should only be shared between processes through inheritance",但我没有使用队列(至少没有明确使用)。我找不到任何可以解释我做错了什么的东西。
下面是一些演示问题的代码(不是我的实际代码):
from concurrent.futures import ProcessPoolExecutor, as_completed
class WhyDoesntThisWork:
def __init__(self):
self.executor = ProcessPoolExecutor(4)
def execute_something(self, starting_letter):
futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
letter = None
for future in as_completed(futures):
letter = future.result()
print(letter)
def something(self, letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
if __name__ == '__main__':
WhyDoesntThisWork(). execute_something('A')
El Ruso 指出将 something() 设为静态方法或类方法会使错误消失。不幸的是,我的实际代码需要使用 self.
调用其他方法
尝试使用此代码 something
@staticmethod
def something(letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
或重构为:
from concurrent.futures import ProcessPoolExecutor, as_completed
class WhyDoesntThisWork:
def something(self, letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
return letter
if __name__ == '__main__':
executor = ProcessPoolExecutor(4)
letter = 'A'
obj = WhyDoesntThisWork()
futures = [executor.submit(obj.something, letter, d) for d in range(4)]
for future in as_completed(futures):
print(future.result())
不用静态方法也能解决。
使用进程时,每个进程运行在独立的内存中space。这与使用线程时不同,不同的线程 运行 在同一个进程下,使用相同的内存 space。因此,当您使用时不会发生错误
ThreadPoolExecutor
但出现在 ProcessPoolExecutor
.
所以当class实例的函数被传递到单独的子进程中时,多处理机制会对该函数进行pickle,以便该函数可以作为一个独立的实例传递到子进程中。当子流程加入时,class 由函数实例更新为未拾取的实例。
要使其工作,只需将 __getstate__()
和 __setstate__()
函数添加到 class 以指导 class 如何 pickle 和 unpickle 函数。在 pickling 中,可以排除不需要的字段,如 del self_dict['executor']
.
所示
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
class GuessItWorksNow():
def __init__(self):
self.executor = ProcessPoolExecutor(4)
def __getstate__(self):
state = self.__dict__.copy()
del state['executor']
return state
def __setstate__(self, state):
self.__dict__.update(state)
def something(self, letter, d):
# do something pointless for the example
p = multiprocessing.current_process()
time.sleep(1)
for x in range(d):
letter = chr(ord(letter) + 1)
return (f'[{p.pid}] ({p.name}) ({letter})')
def execute_something(self, starting_letter):
futures = [self.executor.submit(self.something, starting_letter, d) for d in range(10)]
for future in as_completed(futures):
print(future.result())
if __name__ == '__main__':
obj = GuessItWorksNow()
obj.execute_something('A')
我正在尝试使用 ProcessPoolExecutor,但出现错误 "Queue objects should only be shared between processes through inheritance",但我没有使用队列(至少没有明确使用)。我找不到任何可以解释我做错了什么的东西。
下面是一些演示问题的代码(不是我的实际代码):
from concurrent.futures import ProcessPoolExecutor, as_completed
class WhyDoesntThisWork:
def __init__(self):
self.executor = ProcessPoolExecutor(4)
def execute_something(self, starting_letter):
futures = [self.executor.submit(self.something, starting_letter, d) for d in range(4)]
letter = None
for future in as_completed(futures):
letter = future.result()
print(letter)
def something(self, letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
if __name__ == '__main__':
WhyDoesntThisWork(). execute_something('A')
El Ruso 指出将 something() 设为静态方法或类方法会使错误消失。不幸的是,我的实际代码需要使用 self.
调用其他方法尝试使用此代码 something
@staticmethod
def something(letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
或重构为:
from concurrent.futures import ProcessPoolExecutor, as_completed
class WhyDoesntThisWork:
def something(self, letter, d):
# do something pointless for the example
for x in range(d):
letter = chr(ord(letter) + 1)
return letter
if __name__ == '__main__':
executor = ProcessPoolExecutor(4)
letter = 'A'
obj = WhyDoesntThisWork()
futures = [executor.submit(obj.something, letter, d) for d in range(4)]
for future in as_completed(futures):
print(future.result())
不用静态方法也能解决。
使用进程时,每个进程运行在独立的内存中space。这与使用线程时不同,不同的线程 运行 在同一个进程下,使用相同的内存 space。因此,当您使用时不会发生错误
ThreadPoolExecutor
但出现在 ProcessPoolExecutor
.
所以当class实例的函数被传递到单独的子进程中时,多处理机制会对该函数进行pickle,以便该函数可以作为一个独立的实例传递到子进程中。当子流程加入时,class 由函数实例更新为未拾取的实例。
要使其工作,只需将 __getstate__()
和 __setstate__()
函数添加到 class 以指导 class 如何 pickle 和 unpickle 函数。在 pickling 中,可以排除不需要的字段,如 del self_dict['executor']
.
import multiprocessing
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
class GuessItWorksNow():
def __init__(self):
self.executor = ProcessPoolExecutor(4)
def __getstate__(self):
state = self.__dict__.copy()
del state['executor']
return state
def __setstate__(self, state):
self.__dict__.update(state)
def something(self, letter, d):
# do something pointless for the example
p = multiprocessing.current_process()
time.sleep(1)
for x in range(d):
letter = chr(ord(letter) + 1)
return (f'[{p.pid}] ({p.name}) ({letter})')
def execute_something(self, starting_letter):
futures = [self.executor.submit(self.something, starting_letter, d) for d in range(10)]
for future in as_completed(futures):
print(future.result())
if __name__ == '__main__':
obj = GuessItWorksNow()
obj.execute_something('A')