如何使用 Flask 在请求处理线程中启动另一个线程?
How to start another thread in request handling thread with Flask?
首先,我尝试在这个网站上寻找答案。但是运气不好...
我想实现的是在请求处理线程中启动一个独立的线程来完成一些异步任务。比较棘手的是,在这个独立线程中需要进行一些数据库操作。
这是一个例子。包括五个文件。
project
|__manager.py
|__config.py
|__deployer
|__`__init__.py`
|__models.py
|__views.py
|__operators.py
详细代码如下...
# deployer/__init__.py
from flask import Flask
from deployer.models import db
def create_app():
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
# Add route for index
@app.route('/')
def index():
return {'code': 200, 'message': 'OK'}
return app
# manager.py
from os import environ
from flask_script import Manager, Server
from deployer import create_app
from flask_restful import Api
from deployer.views import HostView
env = environ.get('APM_ENV', 'dev')
app = create_app('config.%sConfig' % env.capitalize())
api = Api(app)
api.add_resource(HostView, '/api/v1/hosts')
manager = Manager(app)
manager.add_command("server", Server(host='0.0.0.0', port=9527))
if __name__ == '__main__':
manager.run(default_command='server')
# deployer/views.py
from flask_restful import Resource, reqparse
from flask import jsonify
from deployer.models import db, Host
from deployer.operators import HostInitiator
parser = reqparse.RequestParser()
parser.add_argument('host', type=int, help='Specify an unique host.')
class HostView(Resource):
def get(self):
h = db.session.query(Host).filter(Host.id == 1).one()
return jsonify(
host_id=h.id,
host_code=h.code,
host_ip=h.ip_addr_v4
)
def post(self):
h = Host(
code='Harbor',
ip_addr_v4='10.10.10.199',
state='created'
)
db.session.add(h)
db.session.commit()
initiator = HostInitiator(host=h)
initiator.start()
return {
'code': 'Harbor',
'ip_addr_v4': '10.10.10.199',
'state': 'created'
}
# deployer/models.py
from sqlalchemy import Column, Integer, String
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Host(db.Model):
__tablename__ = 'br_host'
id = Column(Integer, primary_key=True, autoincrement=True)
code = Column(String(128), index=True, nullable=False)
ip_addr_v4 = Column(String(15), nullable=False)
state = Column(String(16), nullable=False)
# deployer/operators.py
from threading import Thread
from deployer.models import db
class HostInitiator(Thread):
def __init__(self, host):
super().__init__()
self.host = host
def run(self):
# Update Host.state [created-->initating]
db.session.query(Host).filter(Host.id == self.host.id).update({'state': 'initating'})
db.session.commit()
# do some initiating things...
# Update Host.state [initating-->ready]
db.session.query(Host).filter(Host.id == self.host.id).update({'state': 'ready'})
db.session.commit()
上面的代码总是 outside application context
错误。错误消息表明在 HostInitiator
线程中不允许进行任何数据库操作。
它建议我推送上下文或将我的代码移动到视图函数中。我很痛苦,如果你们有任何建议,请帮忙。提前致谢。
代码对我有用
def test_multi_threading_query():
# module which i create Flask app instance
from app.main import app
# module which i create sqlalchemhy instance
from app.model.db import db, Post
with app.app_context():
posts = Post.query.all()
p = posts[0]
p.foo = 1
db.session.add(p)
db.session.commit()
print(p)
@api.route('/test')
def test_view():
from threading import Thread
t = Thread(target=test_multi_threading_query)
t.start()
return ''
# main.py
app = Flask(__main__)
#db.py
db = SQLAlchemy()
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
foo = db.Column(db.Integer)
首先,我尝试在这个网站上寻找答案。但是运气不好...
我想实现的是在请求处理线程中启动一个独立的线程来完成一些异步任务。比较棘手的是,在这个独立线程中需要进行一些数据库操作。
这是一个例子。包括五个文件。
project
|__manager.py
|__config.py
|__deployer
|__`__init__.py`
|__models.py
|__views.py
|__operators.py
详细代码如下...
# deployer/__init__.py
from flask import Flask
from deployer.models import db
def create_app():
app = Flask(__name__)
app.config.from_object(object_name)
db.init_app(app)
# Add route for index
@app.route('/')
def index():
return {'code': 200, 'message': 'OK'}
return app
# manager.py
from os import environ
from flask_script import Manager, Server
from deployer import create_app
from flask_restful import Api
from deployer.views import HostView
env = environ.get('APM_ENV', 'dev')
app = create_app('config.%sConfig' % env.capitalize())
api = Api(app)
api.add_resource(HostView, '/api/v1/hosts')
manager = Manager(app)
manager.add_command("server", Server(host='0.0.0.0', port=9527))
if __name__ == '__main__':
manager.run(default_command='server')
# deployer/views.py
from flask_restful import Resource, reqparse
from flask import jsonify
from deployer.models import db, Host
from deployer.operators import HostInitiator
parser = reqparse.RequestParser()
parser.add_argument('host', type=int, help='Specify an unique host.')
class HostView(Resource):
def get(self):
h = db.session.query(Host).filter(Host.id == 1).one()
return jsonify(
host_id=h.id,
host_code=h.code,
host_ip=h.ip_addr_v4
)
def post(self):
h = Host(
code='Harbor',
ip_addr_v4='10.10.10.199',
state='created'
)
db.session.add(h)
db.session.commit()
initiator = HostInitiator(host=h)
initiator.start()
return {
'code': 'Harbor',
'ip_addr_v4': '10.10.10.199',
'state': 'created'
}
# deployer/models.py
from sqlalchemy import Column, Integer, String
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Host(db.Model):
__tablename__ = 'br_host'
id = Column(Integer, primary_key=True, autoincrement=True)
code = Column(String(128), index=True, nullable=False)
ip_addr_v4 = Column(String(15), nullable=False)
state = Column(String(16), nullable=False)
# deployer/operators.py
from threading import Thread
from deployer.models import db
class HostInitiator(Thread):
def __init__(self, host):
super().__init__()
self.host = host
def run(self):
# Update Host.state [created-->initating]
db.session.query(Host).filter(Host.id == self.host.id).update({'state': 'initating'})
db.session.commit()
# do some initiating things...
# Update Host.state [initating-->ready]
db.session.query(Host).filter(Host.id == self.host.id).update({'state': 'ready'})
db.session.commit()
上面的代码总是 outside application context
错误。错误消息表明在 HostInitiator
线程中不允许进行任何数据库操作。
它建议我推送上下文或将我的代码移动到视图函数中。我很痛苦,如果你们有任何建议,请帮忙。提前致谢。
代码对我有用
def test_multi_threading_query():
# module which i create Flask app instance
from app.main import app
# module which i create sqlalchemhy instance
from app.model.db import db, Post
with app.app_context():
posts = Post.query.all()
p = posts[0]
p.foo = 1
db.session.add(p)
db.session.commit()
print(p)
@api.route('/test')
def test_view():
from threading import Thread
t = Thread(target=test_multi_threading_query)
t.start()
return ''
# main.py
app = Flask(__main__)
#db.py
db = SQLAlchemy()
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
foo = db.Column(db.Integer)