在 Flask 的应用上下文中保持 py2neo 连接
Keeping py2neo connection in Flask's application context
我在我的烧瓶项目中使用没有 flask extension (since it behaves quite strangely) 的 py2neo。
我想要做的是在应用程序上下文中保留我的数据库连接(因为它在整个请求处理过程中被保留,对吧?)。
根据 flask 的文档,我有以下代码:
def get_neo4j():
with app.app_context():
neo4j = getattr(g, '_neo4j', None)
if neo4j is None:
connection_string = http blah blah blah
neo4j = g._neo4j = Graph(connection_string)
return neo4j
这是正确的吗?我如何编辑以上代码以从 werkzeug 的本地代理中受益?
好的,
在阅读了关于 extensions 的 Flask 文档之后,开发我自己的 class 来保持连接是很简单的:
from py2neo import Graph
from flask import current_app
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
class Neo4jConnection(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def connect(self):
# Read configuations from app config
return Graph(bolt=True,
host=current_app.config['NEO4J_SERVER_ADDRESS'],
bolt_port=current_app.config['NEO4J_PORT'],
user=current_app.config['NEO4J_USER'],
password=current_app.config['NEO4J_PASSWORD'],)
def teardown(self, exception):
ctx = stack.top
if hasattr(ctx, 'neo4j_db'):
ctx.neo4j_db = None
@property
def connection(self):
ctx = stack.top
if ctx is not None:
if not hasattr(ctx, 'neo4j_db'):
ctx.neo4j_db = self.connect()
return ctx.neo4j_db
现在使用工厂模式让它工作:
初始化:
neo4j = Neo4jConnection()
neo4j.init_app(app)
用法:
myneo = Neo4jConnection().connection
myneo.push(.....)
我已经测试过了。希望是正确的。
我在我的烧瓶项目中使用没有 flask extension (since it behaves quite strangely) 的 py2neo。
我想要做的是在应用程序上下文中保留我的数据库连接(因为它在整个请求处理过程中被保留,对吧?)。
根据 flask 的文档,我有以下代码:
def get_neo4j():
with app.app_context():
neo4j = getattr(g, '_neo4j', None)
if neo4j is None:
connection_string = http blah blah blah
neo4j = g._neo4j = Graph(connection_string)
return neo4j
这是正确的吗?我如何编辑以上代码以从 werkzeug 的本地代理中受益?
好的, 在阅读了关于 extensions 的 Flask 文档之后,开发我自己的 class 来保持连接是很简单的:
from py2neo import Graph
from flask import current_app
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
class Neo4jConnection(object):
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def connect(self):
# Read configuations from app config
return Graph(bolt=True,
host=current_app.config['NEO4J_SERVER_ADDRESS'],
bolt_port=current_app.config['NEO4J_PORT'],
user=current_app.config['NEO4J_USER'],
password=current_app.config['NEO4J_PASSWORD'],)
def teardown(self, exception):
ctx = stack.top
if hasattr(ctx, 'neo4j_db'):
ctx.neo4j_db = None
@property
def connection(self):
ctx = stack.top
if ctx is not None:
if not hasattr(ctx, 'neo4j_db'):
ctx.neo4j_db = self.connect()
return ctx.neo4j_db
现在使用工厂模式让它工作:
初始化:
neo4j = Neo4jConnection()
neo4j.init_app(app)
用法:
myneo = Neo4jConnection().connection
myneo.push(.....)
我已经测试过了。希望是正确的。