Error in parallelizing for loop inside flask app (TypeError: can't pickle function objects)

Error in parallelizing for loop inside flask app (TypeError: can't pickle function objects)

我有一个 Flask 应用程序,我需要在其中并行化一个 for 循环。到目前为止,我一直在使用 https://blog.dominodatalab.com/simple-parallelization/ 中描述的方法。但是当我在 flask app 中使用这个方法时,它失败了。下面是一个示例代码片段。

我通过 运行 下面的文件启动应用程序。

#!flask/bin/python
from app import app
app.run(host='0.0.0.0',port=5010)

app/init.py如下

from flask import Flask

app = Flask(__name__)
app.config.from_object('config')
from app import views

我正在从 views.py

中的一个路由器函数中调用异步部分
@app.route('/kNearest')
def k_nearest():
    print "in func call"
    requested_author_id = session['requested_author_id']
    complete_ts = ast.literal_eval(session['complete_ts'])
    author_tags = ast.literal_eval(session['prof_labels'])
    author_requested = session['author_requested']
    a = AuthorForecaster()
    k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
    return render_template("nearest.html",
                               title="Closest neighbours",
                               author_name=author_requested,
                               neighbours=k_nearest
                               )

get_nearest_k() 有异步代码。

from joblib import Parallel, delayed
import multiprocessing

# what are your inputs, and what operation do you want to 
# perform on each input. For example...
inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)

下面是我遇到的错误

追溯(最近调用最后):

File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1836, in __call__
    return self.wsgi_app(environ, start_response)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app
    response = self.make_response(self.handle_exception(e))
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1403, in handle_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app
    response = self.full_dispatch_request()
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request
    rv = self.dispatch_request()
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/Users/arun/citations_project/citationswebsite/app/views.py", line 71, in k_nearest
    k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
  File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in get_nearest_k
    distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 758, in __call__
    while self.dispatch_one_batch(iterator):
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
    tasks = BatchedCalls(itertools.islice(iterator, batch_size))
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 127, in __init__
    self.items = list(iterator_slice)
  File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in <genexpr>
    distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
  File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 183, in delayed
    pickle.dumps(function)
  File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle function objects

这是由于多处理库中的函数序列化不佳。替代方法是使用 pathos 库。

import pathos.multiprocessing as mp
p = mp.Pool(4)  # Processing Pool with four processors
p.map(lambda x: x**2, range(10))

参考:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization