使用 flask-socketio 和 multiprocessing thorws 编程 'LoopExit: This operation would block forever'
Program with flask-socketio and multiprocessing thorws 'LoopExit: This operation would block forever'
首先:我是python的绝对初学者,我以前写过PHP,所以如果我有什么不对的地方请告诉我。
我正在写一个应用程序。它应该通过 websockets 提供信息。我为此选择了 flask-socketio。在后台我想处理数据。因为我想让应用程序变小,所以我决定不使用像 Celery 这样的解决方案。
我已将代码缩短为:
# -*- coding: utf8 -*-
from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import time
import os
def background_stuff(args):
while True:
try:
print args
time.sleep(1)
except Exception as e:
return e
thread = None
_pool = None
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
_pool = Pool(1)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
workers = _pool.apply_async(
func=background_stuff,
args=('do background stuff',),
)
socketio.run(app)
# app.run()
启动时,我收到以下消息:
python test/multitest.py
* Running on http://127.0.0.1:5000/
* Restarting with stat
do background stuff
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 336, in _handle_tasks
for taskseq, set_length in iter(taskqueue.get, None):
File "/usr/lib/python2.7/Queue.py", line 168, in get
self.not_empty.wait()
File "/usr/lib/python2.7/threading.py", line 340, in wait
waiter.acquire()
File "gevent/_semaphore.pyx", line 112, in gevent._semaphore.Semaphore.acquire (gevent/gevent._semaphore.c:3386)
File "/home/phil/work/ttimer/server/local/lib/python2.7/site-packages/gevent/hub.py", line 338, in switch
return greenlet.switch(self)
LoopExit: This operation would block forever
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860
do background stuff
do background stuff
do background stuff
do background stuff
^CProcess PoolWorker-1:
Process PoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 113, in worker
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
result = (True, func(*args, **kwds))
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
File "test/multitest.py", line 14, in background_stuff
KeyboardInterrupt
time.sleep(1)
KeyboardInterrupt
return recv()
KeyboardInterrupt
所以后台进程正在运行并响应 http 请求 (127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860)。但是仅仅因为它似乎有效而忽略错误似乎并不是我的解决方案。有谁能告诉我我在这里做错了什么?
如果你说我不能那样做,你能告诉我为什么吗?我想学习和理解我做错了什么。
我阅读了一些关于 monkepatching 的内容,但所有建议都引发了更多或其他错误。我认为最好是解决第一个错误,而不是盲目地尝试修复。
python -V
Python 2.7.9
问候
更新
我添加了 2 行 monkeypatching,这就是我得到的:
$python multitest2.py
^CProcess PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
KeyboardInterrupt
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
task = get()
KeyboardInterrupt
* Running on http://127.0.0.1:5000/
* Restarting with stat
^CProcess PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
task = get()
KeyboardInterrupt
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
KeyboardInterrupt
do background stuff
FAILED to start flash policy server: [Errno 98] Address already in use: ('127.0.0.1', 10843)
$do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
启动时根本没有输出。多次点击 ctrl-c 后,我得到了背景内容输出。这一直持续到我通过 SIGKILL
终止 python 进程
更新 2
我希望看到的是
* Running on http://127.0.0.1:5000/
* Restarting with stat
do background stuff
do background stuff
do background stuff
紧接在脚本的 运行 之后。但是在我按下 ctrl-c 之前什么也没有发生。
首先,您需要知道您使用的Flask-SocketIO版本需要gevent,这是一个协程框架。将 gevent 的异步协程与多处理池一起使用是一种奇怪的组合。您正在使用 gevent,所以最有意义的是使用 gevent pool 功能,这样一切都是一致的。
现在关于这个问题,我认为这可能是由于没有在早期阶段对标准库猴子进行修补。我建议您在脚本的最顶部添加以下行(在您的导入上方,将它们设为第 1 行和第 2 行):
from gevent import monkey
monkey.patch_all()
这些将确保对线程、信号量等事物的标准库的任何调用都转到 gevent 实现。
更新:我试过你的例子。没有猴子补丁的原始版本对我来说工作正常,我没有看到你报告的 LoopExit 错误。正如您所报告的那样,添加猴子补丁可以防止后台进程 运行。
无论如何,我将您的脚本转换为使用 gevent.pool,这对我来说很可靠。这是编辑后的脚本:
from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from gevent.pool import Pool
import time
import os
def background_stuff(args):
while True:
try:
print args
time.sleep(1)
except Exception as e:
return e
thread = None
_pool = None
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
_pool = Pool(1)
workers = _pool.apply_async(
func=background_stuff,
args=('do background stuff',),
)
socketio.run(app)
希望对您有所帮助!
我阅读了有关 gevent 的教程并找到了一个简单干净的解决方案来满足我的需求:
# -*- coding: utf8 -*-
from flask import Flask
from flask.ext.socketio import SocketIO
import gevent
import os
def background_stuff():
while True:
try:
print 'doing background work ... '
gevent.sleep(1)
except Exception as e:
return e
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
gevent.spawn(background_stuff)
socketio.run(app)
教程可以在这里找到:http://sdiehl.github.io/gevent-tutorial/#long-polling
它甚至讲述了 gevent 和多进程的问题:http://sdiehl.github.io/gevent-tutorial/#subprocess,但是因为我找到了一个适合我需要的简单解决方案,所以我没有尝试其他任何东西。
首先:我是python的绝对初学者,我以前写过PHP,所以如果我有什么不对的地方请告诉我。
我正在写一个应用程序。它应该通过 websockets 提供信息。我为此选择了 flask-socketio。在后台我想处理数据。因为我想让应用程序变小,所以我决定不使用像 Celery 这样的解决方案。
我已将代码缩短为:
# -*- coding: utf8 -*-
from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import time
import os
def background_stuff(args):
while True:
try:
print args
time.sleep(1)
except Exception as e:
return e
thread = None
_pool = None
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
_pool = Pool(1)
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
workers = _pool.apply_async(
func=background_stuff,
args=('do background stuff',),
)
socketio.run(app)
# app.run()
启动时,我收到以下消息:
python test/multitest.py
* Running on http://127.0.0.1:5000/
* Restarting with stat
do background stuff
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 336, in _handle_tasks
for taskseq, set_length in iter(taskqueue.get, None):
File "/usr/lib/python2.7/Queue.py", line 168, in get
self.not_empty.wait()
File "/usr/lib/python2.7/threading.py", line 340, in wait
waiter.acquire()
File "gevent/_semaphore.pyx", line 112, in gevent._semaphore.Semaphore.acquire (gevent/gevent._semaphore.c:3386)
File "/home/phil/work/ttimer/server/local/lib/python2.7/site-packages/gevent/hub.py", line 338, in switch
return greenlet.switch(self)
LoopExit: This operation would block forever
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860
do background stuff
do background stuff
do background stuff
do background stuff
^CProcess PoolWorker-1:
Process PoolWorker-1:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 113, in worker
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
result = (True, func(*args, **kwds))
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
File "test/multitest.py", line 14, in background_stuff
KeyboardInterrupt
time.sleep(1)
KeyboardInterrupt
return recv()
KeyboardInterrupt
所以后台进程正在运行并响应 http 请求 (127.0.0.1 - - [2015-09-30 00:06:23] "GET / HTTP/1.1" 200 120 0.001860)。但是仅仅因为它似乎有效而忽略错误似乎并不是我的解决方案。有谁能告诉我我在这里做错了什么?
如果你说我不能那样做,你能告诉我为什么吗?我想学习和理解我做错了什么。
我阅读了一些关于 monkepatching 的内容,但所有建议都引发了更多或其他错误。我认为最好是解决第一个错误,而不是盲目地尝试修复。
python -V
Python 2.7.9
问候
更新
我添加了 2 行 monkeypatching,这就是我得到的:
$python multitest2.py
^CProcess PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
KeyboardInterrupt
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
task = get()
KeyboardInterrupt
* Running on http://127.0.0.1:5000/
* Restarting with stat
^CProcess PoolWorker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 763, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 380, in _handle_results
task = get()
KeyboardInterrupt
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
KeyboardInterrupt
do background stuff
FAILED to start flash policy server: [Errno 98] Address already in use: ('127.0.0.1', 10843)
$do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
do background stuff
启动时根本没有输出。多次点击 ctrl-c 后,我得到了背景内容输出。这一直持续到我通过 SIGKILL
终止 python 进程更新 2
我希望看到的是
* Running on http://127.0.0.1:5000/
* Restarting with stat
do background stuff
do background stuff
do background stuff
紧接在脚本的 运行 之后。但是在我按下 ctrl-c 之前什么也没有发生。
首先,您需要知道您使用的Flask-SocketIO版本需要gevent,这是一个协程框架。将 gevent 的异步协程与多处理池一起使用是一种奇怪的组合。您正在使用 gevent,所以最有意义的是使用 gevent pool 功能,这样一切都是一致的。
现在关于这个问题,我认为这可能是由于没有在早期阶段对标准库猴子进行修补。我建议您在脚本的最顶部添加以下行(在您的导入上方,将它们设为第 1 行和第 2 行):
from gevent import monkey
monkey.patch_all()
这些将确保对线程、信号量等事物的标准库的任何调用都转到 gevent 实现。
更新:我试过你的例子。没有猴子补丁的原始版本对我来说工作正常,我没有看到你报告的 LoopExit 错误。正如您所报告的那样,添加猴子补丁可以防止后台进程 运行。
无论如何,我将您的脚本转换为使用 gevent.pool,这对我来说很可靠。这是编辑后的脚本:
from flask import Flask, jsonify, abort, make_response, url_for, request, render_template
from flask.ext.socketio import SocketIO, emit
from gevent.pool import Pool
import time
import os
def background_stuff(args):
while True:
try:
print args
time.sleep(1)
except Exception as e:
return e
thread = None
_pool = None
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
_pool = Pool(1)
workers = _pool.apply_async(
func=background_stuff,
args=('do background stuff',),
)
socketio.run(app)
希望对您有所帮助!
我阅读了有关 gevent 的教程并找到了一个简单干净的解决方案来满足我的需求:
# -*- coding: utf8 -*-
from flask import Flask
from flask.ext.socketio import SocketIO
import gevent
import os
def background_stuff():
while True:
try:
print 'doing background work ... '
gevent.sleep(1)
except Exception as e:
return e
app = Flask(__name__)
app.debug = True
socketio = SocketIO(app)
@app.route('/', methods=['GET'])
def get_timers():
return 'timer'
if __name__=='__main__':
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
gevent.spawn(background_stuff)
socketio.run(app)
教程可以在这里找到:http://sdiehl.github.io/gevent-tutorial/#long-polling
它甚至讲述了 gevent 和多进程的问题:http://sdiehl.github.io/gevent-tutorial/#subprocess,但是因为我找到了一个适合我需要的简单解决方案,所以我没有尝试其他任何东西。