多处理 apply_async() 在 Ubuntu 上不工作

Multiprocessing apply_async() not working on Ubuntu

我运行将此代码作为 CherryPy Web 服务在 Mac OS X 和 Ubuntu 14.04 上运行。通过在 python3 上使用 multiprocessing 我想以异步方式在 Process Pool.

中启动静态方法 worker()

相同的代码 运行 在 Mac OS X 上完美无缺,在 Ubuntu 14.04 worker() 中没有 运行。 IE。通过调试 POST 方法中的代码,我可以看到每一行都已执行 - 来自

reqid = str(uuid.uuid4())

return handle_error(202, "Request ID: " + reqid)

在 Ubuntu 14.04 中启动相同的代码,它没有 运行 worker() 方法,甚至没有方法顶部的 print()(这将被记录)。

相关代码如下(我只省略了handle_error()方法):

import cherrypy
import json
from lib import get_parameters, handle_error
from multiprocessing import Pool
import os
from pymatbridge import Matlab
import requests
import shutil
import uuid
from xml.etree import ElementTree

class Schedule(object):
    exposed = True

    def __init__(self, mlab_path, pool):
        self.mlab_path = mlab_path
        self.pool = pool

    def POST(self, *paths, **params):

        if validate(cherrypy.request.headers):

            try:
                reqid = str(uuid.uuid4())
                path = os.path.join("results", reqid)
                os.makedirs(path)
                wargs = [(self.mlab_path, reqid)]
                self.pool.apply_async(Schedule.worker, wargs)

                return handle_error(202, "Request ID: " + reqid)
            except:
                return handle_error(500, "Internal Server Error")
        else:
            return handle_error(401, "Unauthorized")

    #### this is not executed ####
    @staticmethod
    def worker(args):

        mlab_path, reqid = args
        mlab = Matlab(executable=mlab_path)
        mlab.start()

        mlab.run_code("cd mlab")
        mlab.run_code("sched")
        a = mlab.get_variable("a")

        mlab.stop()

        return reqid

    ####

# to start the Web Service
if __name__ == "__main__":

    # start Web Service with some configuration
    global_conf = {
           "global":    {
                            "server.environment": "production",
                            "engine.autoreload.on": True,
                            "engine.autoreload.frequency": 5,
                            "server.socket_host": "0.0.0.0",
                            "log.screen": False,
                            "log.access_file": "site.log",
                            "log.error_file": "site.log",
                            "server.socket_port": 8084
                        }
    }
    cherrypy.config.update(global_conf)
    conf = {
        "/": {
            "request.dispatch": cherrypy.dispatch.MethodDispatcher(),
            "tools.encode.debug": True,
            "request.show_tracebacks": False
        }
    }

    pool = Pool(3)

    cherrypy.tree.mount(Schedule('matlab', pool), "/sched", conf)

    # activate signal handler
    if hasattr(cherrypy.engine, "signal_handler"):
        cherrypy.engine.signal_handler.subscribe()

    # start serving pages
    cherrypy.engine.start()
    cherrypy.engine.block()

我解决了将方法从 @staticmethod 更改为 @classmethod 的问题。现在作业在 ProcessPool 内运行。我发现类方法在这种情况下更有用,如 here.

所述

谢谢。

你的逻辑在向你隐瞒问题。 apply_async 方法 returns 一个 AsyncResult 对象,它充当您刚刚安排的异步任务的处理程序。当您忽略计划任务的结果时,整个事情看起来像 "failing silently".

如果您尝试从该任务中获取结果,您就会发现真正的问题所在。

handler = self.pool.apply_async(Schedule.worker, wargs)
handler.get()

... traceback here ...
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

简而言之,您必须确保传递给 Pool 的参数是 Picklable

Instance 和 class 方法是 Picklable 如果它们所属的 object/class 也是可 picklable 的。静态方法不可 picklable,因为它们失去了与对象本身的关联,因此 pickle 库无法正确序列化它们。

一般而言,最好避免安排 multiprocessing.Pool 与顶级定义函数不同的任何内容。

对于 运行 使用 Cherrypy 的后台任务,最好使用像 Celery or RQ 这样的异步任务队列管理器。此服务非常易于安装,运行,您的任务将 运行 在一个完全独立的进程中,如果您需要扩展,因为您的负载正在增加,它将非常简单。

您有一个简单的 Cherrypy 示例