多处理在 Ubuntu 中有效,在 Windows 中无效
Multiprocessing works in Ubuntu, doesn't in Windows
我正在尝试使用 this example 作为我的 cherrypy 应用程序排队系统的模板。
我能够将它从 python 2 转换为 python 3(将 from Queue import Empty
更改为 from queue import Empty
)并在 Ubuntu 中执行它。但是当我在 Windows 中执行它时,出现以下错误:
F:\workspace\test>python test.py
Traceback (most recent call last):
File "test.py", line 112, in <module>
broker.start()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
F:\workspace\test>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main
new_handle = steal_handle(parent_pid, pipe_handle)
File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect
完整代码如下:
# from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
import sys
import logging
from logging import handlers
from cherrypy.process import wspbus
class MyBus(wspbus.Bus):
def __init__(self, name=""):
wspbus.Bus.__init__(self)
self.open_logger(name)
self.subscribe("log", self._log)
def exit(self):
wspbus.Bus.exit(self)
self.close_logger()
def open_logger(self, name=""):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.INFO)
h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(h)
self.logger = logger
def close_logger(self):
for handler in self.logger.handlers:
handler.flush()
handler.close()
def _log(self, msg="", level=logging.INFO):
self.logger.log(level, msg)
import random
import string
from multiprocessing import Process
class Bank(object):
def __init__(self, queue):
self.bus = MyBus(Bank.__name__)
self.queue = queue
self.bus.subscribe("main", self.randomly_place_order)
self.bus.subscribe("exit", self.terminate)
def randomly_place_order(self):
order = random.sample(['BUY', 'SELL'], 1)[0]
code = random.sample(string.ascii_uppercase, 4)
amount = random.randint(0, 100)
message = "%s %s %d" % (order, ''.join(code), amount)
self.bus.log("Placing order: %s" % message)
self.queue.put(message)
def run(self):
self.bus.start()
self.bus.block(interval=0.01)
def terminate(self):
self.bus.unsubscribe("main", self.randomly_place_order)
self.bus.unsubscribe("exit", self.terminate)
from queue import Empty
class Broker(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
self.bus = MyBus(Broker.__name__)
self.bus.subscribe("main", self.check)
def check(self):
try:
message = self.queue.get_nowait()
except Empty:
return
if message == "stop":
self.bus.unsubscribe("main", self.check)
self.bus.exit()
elif message.startswith("BUY"):
self.buy(*message.split(' ', 2)[1:])
elif message.startswith("SELL"):
self.sell(*message.split(' ', 2)[1:])
def run(self):
self.bus.start()
self.bus.block(interval=0.01)
def stop(self):
self.queue.put("stop")
def buy(self, code, amount):
self.bus.log("BUY order placed for %s %s" % (amount, code))
def sell(self, code, amount):
self.bus.log("SELL order placed for %s %s" % (amount, code))
if __name__ == '__main__':
from multiprocessing import Queue
queue = Queue()
broker = Broker(queue)
broker.start()
bank = Bank(queue)
bank.run()
问题是 MyBus
对象的某些部分不可 picklable,并且您正在将 MyBus
的实例保存到 Broker
实例。因为 Windows 缺少 fork()
支持,当您调用 broker.start()
时,broker
的整个状态必须在 multiprocessing
产生的子进程中被 pickle 并重新创建以执行broker.run
。它适用于 Linux,因为 Linux 支持 fork
;在这种情况下它不需要 pickle 任何东西——子进程一旦被分叉就会包含父进程的完整状态。
有两种方法可以解决这个问题。第一种也是更困难的方法是使您的 broker
实例可挑选。为此,您需要使 MyBus
可腌制。您现在遇到的错误是指 MyBus
上的 logger
属性,该属性不可 picklable。那个很容易修复;只需在 MyBus
中添加 __getstate__
/__setstate__
方法,用于控制对象如何 pickled/unpickled。如果我们在 pickle 时删除记录器,并在 unpickle 时重新创建它,我们将解决这个问题:
class MyBus(wspbus.Bus):
...
def __getstate__(self):
self_dict = self.__dict__
del self_dict['logger']
return self_dict
def __setstate__(self, d):
self.__dict__.update(d)
self.open_logger()
这行得通,但随后我们遇到了 另一个 酸洗错误:
Traceback (most recent call last):
File "async2.py", line 121, in <module>
broker.start()
File "C:\python34\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'cherrypy.process.wspbus._StateEnum.State'>: attribute lookup State on cherrypy.process.wspbus failed
原来cherrypy.process.wspbus._StateEnum.State
是[=15=继承的wspbus.Bus
class上的一个属性,是嵌套class,嵌套classes 不能被 pickle:
class _StateEnum(object):
class State(object):
name = None
def __repr__(self):
return "states.%s" % self.name
State
对象(惊喜)用于跟踪 Bus
实例的状态。由于我们在启动总线之前进行 pickling,我们可以在 pickle 时从对象中删除 state
属性,并在我们取消 pickle 时将其设置为 States.STOPPED。
class MyBus(wspbus.Bus):
def __init__(self, name=""):
wspbus.Bus.__init__(self)
self.open_logger(name)
self.subscribe("log", self._log)
def __getstate__(self):
self_dict = self.__dict__
del self_dict['logger']
del self_dict['state']
return self_dict
def __setstate__(self, d):
self.__dict__.update(d)
self.open_logger()
self.state = wspbus.states.STOPPED # Initialize to STOPPED
通过这些更改,代码可以按预期工作!唯一的限制是,如果公共汽车还没有启动,那么 pickle MyBus
是安全的,这对你的用例来说很好。
同样,这是一条艰难的道路。简单的方法是完全不需要 pickle MyBus
实例。您可以只在子进程中创建 MyBus
实例,而不是父进程:
class Broker(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
...
def run(self):
self.bus = MyBus(Broker.__name__) # Create the instance here, in the child
self.bus.subscribe("main", self.check)
self.bus.start()
self.bus.block(interval=0.01)
只要你不需要在父级访问broker.bus
,这是更简单的选择。
我正在尝试使用 this example 作为我的 cherrypy 应用程序排队系统的模板。
我能够将它从 python 2 转换为 python 3(将 from Queue import Empty
更改为 from queue import Empty
)并在 Ubuntu 中执行它。但是当我在 Windows 中执行它时,出现以下错误:
F:\workspace\test>python test.py
Traceback (most recent call last):
File "test.py", line 112, in <module>
broker.start()
File "C:\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot serialize '_io.TextIOWrapper' object
F:\workspace\test>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Anaconda3\lib\multiprocessing\spawn.py", line 100, in spawn_main
new_handle = steal_handle(parent_pid, pipe_handle)
File "C:\Anaconda3\lib\multiprocessing\reduction.py", line 81, in steal_handle
_winapi.PROCESS_DUP_HANDLE, False, source_pid)
OSError: [WinError 87] The parameter is incorrect
完整代码如下:
# from http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
import sys
import logging
from logging import handlers
from cherrypy.process import wspbus
class MyBus(wspbus.Bus):
def __init__(self, name=""):
wspbus.Bus.__init__(self)
self.open_logger(name)
self.subscribe("log", self._log)
def exit(self):
wspbus.Bus.exit(self)
self.close_logger()
def open_logger(self, name=""):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
h = logging.StreamHandler(sys.stdout)
h.setLevel(logging.INFO)
h.setFormatter(logging.Formatter("[%(asctime)s] %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(h)
self.logger = logger
def close_logger(self):
for handler in self.logger.handlers:
handler.flush()
handler.close()
def _log(self, msg="", level=logging.INFO):
self.logger.log(level, msg)
import random
import string
from multiprocessing import Process
class Bank(object):
def __init__(self, queue):
self.bus = MyBus(Bank.__name__)
self.queue = queue
self.bus.subscribe("main", self.randomly_place_order)
self.bus.subscribe("exit", self.terminate)
def randomly_place_order(self):
order = random.sample(['BUY', 'SELL'], 1)[0]
code = random.sample(string.ascii_uppercase, 4)
amount = random.randint(0, 100)
message = "%s %s %d" % (order, ''.join(code), amount)
self.bus.log("Placing order: %s" % message)
self.queue.put(message)
def run(self):
self.bus.start()
self.bus.block(interval=0.01)
def terminate(self):
self.bus.unsubscribe("main", self.randomly_place_order)
self.bus.unsubscribe("exit", self.terminate)
from queue import Empty
class Broker(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
self.bus = MyBus(Broker.__name__)
self.bus.subscribe("main", self.check)
def check(self):
try:
message = self.queue.get_nowait()
except Empty:
return
if message == "stop":
self.bus.unsubscribe("main", self.check)
self.bus.exit()
elif message.startswith("BUY"):
self.buy(*message.split(' ', 2)[1:])
elif message.startswith("SELL"):
self.sell(*message.split(' ', 2)[1:])
def run(self):
self.bus.start()
self.bus.block(interval=0.01)
def stop(self):
self.queue.put("stop")
def buy(self, code, amount):
self.bus.log("BUY order placed for %s %s" % (amount, code))
def sell(self, code, amount):
self.bus.log("SELL order placed for %s %s" % (amount, code))
if __name__ == '__main__':
from multiprocessing import Queue
queue = Queue()
broker = Broker(queue)
broker.start()
bank = Bank(queue)
bank.run()
问题是 MyBus
对象的某些部分不可 picklable,并且您正在将 MyBus
的实例保存到 Broker
实例。因为 Windows 缺少 fork()
支持,当您调用 broker.start()
时,broker
的整个状态必须在 multiprocessing
产生的子进程中被 pickle 并重新创建以执行broker.run
。它适用于 Linux,因为 Linux 支持 fork
;在这种情况下它不需要 pickle 任何东西——子进程一旦被分叉就会包含父进程的完整状态。
有两种方法可以解决这个问题。第一种也是更困难的方法是使您的 broker
实例可挑选。为此,您需要使 MyBus
可腌制。您现在遇到的错误是指 MyBus
上的 logger
属性,该属性不可 picklable。那个很容易修复;只需在 MyBus
中添加 __getstate__
/__setstate__
方法,用于控制对象如何 pickled/unpickled。如果我们在 pickle 时删除记录器,并在 unpickle 时重新创建它,我们将解决这个问题:
class MyBus(wspbus.Bus):
...
def __getstate__(self):
self_dict = self.__dict__
del self_dict['logger']
return self_dict
def __setstate__(self, d):
self.__dict__.update(d)
self.open_logger()
这行得通,但随后我们遇到了 另一个 酸洗错误:
Traceback (most recent call last):
File "async2.py", line 121, in <module>
broker.start()
File "C:\python34\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\python34\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\python34\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\python34\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'cherrypy.process.wspbus._StateEnum.State'>: attribute lookup State on cherrypy.process.wspbus failed
原来cherrypy.process.wspbus._StateEnum.State
是[=15=继承的wspbus.Bus
class上的一个属性,是嵌套class,嵌套classes 不能被 pickle:
class _StateEnum(object):
class State(object):
name = None
def __repr__(self):
return "states.%s" % self.name
State
对象(惊喜)用于跟踪 Bus
实例的状态。由于我们在启动总线之前进行 pickling,我们可以在 pickle 时从对象中删除 state
属性,并在我们取消 pickle 时将其设置为 States.STOPPED。
class MyBus(wspbus.Bus):
def __init__(self, name=""):
wspbus.Bus.__init__(self)
self.open_logger(name)
self.subscribe("log", self._log)
def __getstate__(self):
self_dict = self.__dict__
del self_dict['logger']
del self_dict['state']
return self_dict
def __setstate__(self, d):
self.__dict__.update(d)
self.open_logger()
self.state = wspbus.states.STOPPED # Initialize to STOPPED
通过这些更改,代码可以按预期工作!唯一的限制是,如果公共汽车还没有启动,那么 pickle MyBus
是安全的,这对你的用例来说很好。
同样,这是一条艰难的道路。简单的方法是完全不需要 pickle MyBus
实例。您可以只在子进程中创建 MyBus
实例,而不是父进程:
class Broker(Process):
def __init__(self, queue):
Process.__init__(self)
self.queue = queue
...
def run(self):
self.bus = MyBus(Broker.__name__) # Create the instance here, in the child
self.bus.subscribe("main", self.check)
self.bus.start()
self.bus.block(interval=0.01)
只要你不需要在父级访问broker.bus
,这是更简单的选择。