多处理 apply_async() 在 Ubuntu 上不工作
Multiprocessing apply_async() not working on Ubuntu
我运行将此代码作为 CherryPy Web 服务在 Mac OS X 和 Ubuntu 14.04 上运行。通过在 python3 上使用 multiprocessing
我想以异步方式在 Process Pool
.
中启动静态方法 worker()
相同的代码 运行 在 Mac OS X 上完美无缺,在 Ubuntu 14.04 worker()
中没有 运行。 IE。通过调试 POST
方法中的代码,我可以看到每一行都已执行 - 来自
reqid = str(uuid.uuid4())
至
return handle_error(202, "Request ID: " + reqid)
在 Ubuntu 14.04 中启动相同的代码,它没有 运行 worker()
方法,甚至没有方法顶部的 print()
(这将被记录)。
相关代码如下(我只省略了handle_error()
方法):
import cherrypy
import json
from lib import get_parameters, handle_error
from multiprocessing import Pool
import os
from pymatbridge import Matlab
import requests
import shutil
import uuid
from xml.etree import ElementTree
class Schedule(object):
exposed = True
def __init__(self, mlab_path, pool):
self.mlab_path = mlab_path
self.pool = pool
def POST(self, *paths, **params):
if validate(cherrypy.request.headers):
try:
reqid = str(uuid.uuid4())
path = os.path.join("results", reqid)
os.makedirs(path)
wargs = [(self.mlab_path, reqid)]
self.pool.apply_async(Schedule.worker, wargs)
return handle_error(202, "Request ID: " + reqid)
except:
return handle_error(500, "Internal Server Error")
else:
return handle_error(401, "Unauthorized")
#### this is not executed ####
@staticmethod
def worker(args):
mlab_path, reqid = args
mlab = Matlab(executable=mlab_path)
mlab.start()
mlab.run_code("cd mlab")
mlab.run_code("sched")
a = mlab.get_variable("a")
mlab.stop()
return reqid
####
# to start the Web Service
if __name__ == "__main__":
# start Web Service with some configuration
global_conf = {
"global": {
"server.environment": "production",
"engine.autoreload.on": True,
"engine.autoreload.frequency": 5,
"server.socket_host": "0.0.0.0",
"log.screen": False,
"log.access_file": "site.log",
"log.error_file": "site.log",
"server.socket_port": 8084
}
}
cherrypy.config.update(global_conf)
conf = {
"/": {
"request.dispatch": cherrypy.dispatch.MethodDispatcher(),
"tools.encode.debug": True,
"request.show_tracebacks": False
}
}
pool = Pool(3)
cherrypy.tree.mount(Schedule('matlab', pool), "/sched", conf)
# activate signal handler
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
# start serving pages
cherrypy.engine.start()
cherrypy.engine.block()
我解决了将方法从 @staticmethod
更改为 @classmethod
的问题。现在作业在 ProcessPool
内运行。我发现类方法在这种情况下更有用,如 here.
所述
谢谢。
你的逻辑在向你隐瞒问题。 apply_async
方法 returns 一个 AsyncResult 对象,它充当您刚刚安排的异步任务的处理程序。当您忽略计划任务的结果时,整个事情看起来像 "failing silently".
如果您尝试从该任务中获取结果,您就会发现真正的问题所在。
handler = self.pool.apply_async(Schedule.worker, wargs)
handler.get()
... traceback here ...
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
简而言之,您必须确保传递给 Pool 的参数是 Picklable。
Instance 和 class 方法是 Picklable 如果它们所属的 object/class 也是可 picklable 的。静态方法不可 picklable,因为它们失去了与对象本身的关联,因此 pickle 库无法正确序列化它们。
一般而言,最好避免安排 multiprocessing.Pool
与顶级定义函数不同的任何内容。
对于 运行 使用 Cherrypy 的后台任务,最好使用像 Celery or RQ 这样的异步任务队列管理器。此服务非常易于安装,运行,您的任务将 运行 在一个完全独立的进程中,如果您需要扩展,因为您的负载正在增加,它将非常简单。
您有一个简单的 Cherrypy 示例 。
我运行将此代码作为 CherryPy Web 服务在 Mac OS X 和 Ubuntu 14.04 上运行。通过在 python3 上使用 multiprocessing
我想以异步方式在 Process Pool
.
worker()
相同的代码 运行 在 Mac OS X 上完美无缺,在 Ubuntu 14.04 worker()
中没有 运行。 IE。通过调试 POST
方法中的代码,我可以看到每一行都已执行 - 来自
reqid = str(uuid.uuid4())
至
return handle_error(202, "Request ID: " + reqid)
在 Ubuntu 14.04 中启动相同的代码,它没有 运行 worker()
方法,甚至没有方法顶部的 print()
(这将被记录)。
相关代码如下(我只省略了handle_error()
方法):
import cherrypy
import json
from lib import get_parameters, handle_error
from multiprocessing import Pool
import os
from pymatbridge import Matlab
import requests
import shutil
import uuid
from xml.etree import ElementTree
class Schedule(object):
exposed = True
def __init__(self, mlab_path, pool):
self.mlab_path = mlab_path
self.pool = pool
def POST(self, *paths, **params):
if validate(cherrypy.request.headers):
try:
reqid = str(uuid.uuid4())
path = os.path.join("results", reqid)
os.makedirs(path)
wargs = [(self.mlab_path, reqid)]
self.pool.apply_async(Schedule.worker, wargs)
return handle_error(202, "Request ID: " + reqid)
except:
return handle_error(500, "Internal Server Error")
else:
return handle_error(401, "Unauthorized")
#### this is not executed ####
@staticmethod
def worker(args):
mlab_path, reqid = args
mlab = Matlab(executable=mlab_path)
mlab.start()
mlab.run_code("cd mlab")
mlab.run_code("sched")
a = mlab.get_variable("a")
mlab.stop()
return reqid
####
# to start the Web Service
if __name__ == "__main__":
# start Web Service with some configuration
global_conf = {
"global": {
"server.environment": "production",
"engine.autoreload.on": True,
"engine.autoreload.frequency": 5,
"server.socket_host": "0.0.0.0",
"log.screen": False,
"log.access_file": "site.log",
"log.error_file": "site.log",
"server.socket_port": 8084
}
}
cherrypy.config.update(global_conf)
conf = {
"/": {
"request.dispatch": cherrypy.dispatch.MethodDispatcher(),
"tools.encode.debug": True,
"request.show_tracebacks": False
}
}
pool = Pool(3)
cherrypy.tree.mount(Schedule('matlab', pool), "/sched", conf)
# activate signal handler
if hasattr(cherrypy.engine, "signal_handler"):
cherrypy.engine.signal_handler.subscribe()
# start serving pages
cherrypy.engine.start()
cherrypy.engine.block()
我解决了将方法从 @staticmethod
更改为 @classmethod
的问题。现在作业在 ProcessPool
内运行。我发现类方法在这种情况下更有用,如 here.
谢谢。
你的逻辑在向你隐瞒问题。 apply_async
方法 returns 一个 AsyncResult 对象,它充当您刚刚安排的异步任务的处理程序。当您忽略计划任务的结果时,整个事情看起来像 "failing silently".
如果您尝试从该任务中获取结果,您就会发现真正的问题所在。
handler = self.pool.apply_async(Schedule.worker, wargs)
handler.get()
... traceback here ...
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
简而言之,您必须确保传递给 Pool 的参数是 Picklable。
Instance 和 class 方法是 Picklable 如果它们所属的 object/class 也是可 picklable 的。静态方法不可 picklable,因为它们失去了与对象本身的关联,因此 pickle 库无法正确序列化它们。
一般而言,最好避免安排 multiprocessing.Pool
与顶级定义函数不同的任何内容。
对于 运行 使用 Cherrypy 的后台任务,最好使用像 Celery or RQ 这样的异步任务队列管理器。此服务非常易于安装,运行,您的任务将 运行 在一个完全独立的进程中,如果您需要扩展,因为您的负载正在增加,它将非常简单。
您有一个简单的 Cherrypy 示例