Error in parallelizing for loop inside flask app (TypeError: can't pickle function objects)
Error in parallelizing for loop inside flask app (TypeError: can't pickle function objects)
我有一个 Flask 应用程序,我需要在其中并行化一个 for 循环。到目前为止,我一直在使用 https://blog.dominodatalab.com/simple-parallelization/ 中描述的方法。但是当我在 flask app 中使用这个方法时,它失败了。下面是一个示例代码片段。
我通过 运行 下面的文件启动应用程序。
#!flask/bin/python
from app import app
app.run(host='0.0.0.0',port=5010)
app/init.py如下
from flask import Flask
app = Flask(__name__)
app.config.from_object('config')
from app import views
我正在从 views.py
中的一个路由器函数中调用异步部分
@app.route('/kNearest')
def k_nearest():
print "in func call"
requested_author_id = session['requested_author_id']
complete_ts = ast.literal_eval(session['complete_ts'])
author_tags = ast.literal_eval(session['prof_labels'])
author_requested = session['author_requested']
a = AuthorForecaster()
k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
return render_template("nearest.html",
title="Closest neighbours",
author_name=author_requested,
neighbours=k_nearest
)
get_nearest_k() 有异步代码。
from joblib import Parallel, delayed
import multiprocessing
# what are your inputs, and what operation do you want to
# perform on each input. For example...
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
下面是我遇到的错误
追溯(最近调用最后):
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1836, in __call__
return self.wsgi_app(environ, start_response)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app
response = self.make_response(self.handle_exception(e))
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1403, in handle_exception
reraise(exc_type, exc_value, tb)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app
response = self.full_dispatch_request()
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request
rv = self.dispatch_request()
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/Users/arun/citations_project/citationswebsite/app/views.py", line 71, in k_nearest
k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in get_nearest_k
distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 758, in __call__
while self.dispatch_one_batch(iterator):
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
tasks = BatchedCalls(itertools.islice(iterator, batch_size))
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 127, in __init__
self.items = list(iterator_slice)
File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in <genexpr>
distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 183, in delayed
pickle.dumps(function)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle function objects
这是由于多处理库中的函数序列化不佳。替代方法是使用 pathos 库。
import pathos.multiprocessing as mp
p = mp.Pool(4) # Processing Pool with four processors
p.map(lambda x: x**2, range(10))
参考:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
我有一个 Flask 应用程序,我需要在其中并行化一个 for 循环。到目前为止,我一直在使用 https://blog.dominodatalab.com/simple-parallelization/ 中描述的方法。但是当我在 flask app 中使用这个方法时,它失败了。下面是一个示例代码片段。
我通过 运行 下面的文件启动应用程序。
#!flask/bin/python
from app import app
app.run(host='0.0.0.0',port=5010)
app/init.py如下
from flask import Flask
app = Flask(__name__)
app.config.from_object('config')
from app import views
我正在从 views.py
中的一个路由器函数中调用异步部分@app.route('/kNearest')
def k_nearest():
print "in func call"
requested_author_id = session['requested_author_id']
complete_ts = ast.literal_eval(session['complete_ts'])
author_tags = ast.literal_eval(session['prof_labels'])
author_requested = session['author_requested']
a = AuthorForecaster()
k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
return render_template("nearest.html",
title="Closest neighbours",
author_name=author_requested,
neighbours=k_nearest
)
get_nearest_k() 有异步代码。
from joblib import Parallel, delayed
import multiprocessing
# what are your inputs, and what operation do you want to
# perform on each input. For example...
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
下面是我遇到的错误
追溯(最近调用最后):
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1836, in __call__
return self.wsgi_app(environ, start_response)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app
response = self.make_response(self.handle_exception(e))
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1403, in handle_exception
reraise(exc_type, exc_value, tb)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app
response = self.full_dispatch_request()
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request
rv = self.dispatch_request()
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/Users/arun/citations_project/citationswebsite/app/views.py", line 71, in k_nearest
k_nearest = a.get_nearest_k(requested_author_id, complete_ts, author_tags)
File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in get_nearest_k
distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 758, in __call__
while self.dispatch_one_batch(iterator):
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 603, in dispatch_one_batch
tasks = BatchedCalls(itertools.islice(iterator, batch_size))
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 127, in __init__
self.items = list(iterator_slice)
File "/Users/arun/citations_project/citationswebsite/app/timeseries_api.py", line 206, in <genexpr>
distances = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in top_1000_from_good_ones)
File "/Users/arun/citations_project/citationswebsite/flask/lib/python2.7/site-packages/joblib/parallel.py", line 183, in delayed
pickle.dumps(function)
File "/usr/local/Cellar/python/2.7.10_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle function objects
这是由于多处理库中的函数序列化不佳。替代方法是使用 pathos 库。
import pathos.multiprocessing as mp
p = mp.Pool(4) # Processing Pool with four processors
p.map(lambda x: x**2, range(10))
参考:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization