尝试从侦听器 python 网络框架访问对象
Trying to access an object from a listener python web framework
异步的新手,所以这是我的问题,在此先感谢您。
大家好,很简单的问题,我可能想太多了。
我正在尝试在下面这些已注册到 sanic 主应用程序的已定义侦听器之外访问此 cassandra 客户端。
我需要会话才能使用将异步执行的更新查询。我可以从下面的 'setup_cassandra_session_listener' 方法明确连接和事件查询。但是很难弄清楚如何在外部调用这个 Cassandra 会话并隔离,以便我可以访问其他地方。
from aiocassandra import aiosession
from cassandra.cluster import Cluster
from sanic import Sanic
from config import CLUSTER_HOST, TABLE_NAME, CASSANDRA_KEY_SPACE, CASSANDRA_PORT, DATA_CENTER, DEBUG_LEVEL, LOGGER_FORMAT
log = logging.getLogger('sanic')
log.setLevel('INFO')
cassandra_cluster = None
def setup_cassandra_session_listener(app, loop):
global cassandra_cluster
cassandra_cluster = Cluster([CLUSTER_HOST], CASSANDRA_PORT, DATA_CENTER)
session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)
metadata = cassandra_cluster.metadata
app.session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)
log.info('Connected to cluster: ' + metadata.cluster_name)
aiosession(session)
app.cassandra = session
def teardown_cassandra_session_listener(app, loop):
global cassandra_cluster
cassandra_cluster.shutdown()
def register_cassandra(app: Sanic):
app.listener('before_server_start')(setup_cassandra_session_listener)
app.listener('after_server_stop')(teardown_cassandra_session_listener)
这是一个可以满足您需要的工作示例。它实际上不是 运行 Cassandra(因为我没有这样做的经验)。但是,原则上这应该适用于您需要在 运行ning 服务器的整个生命周期内管理的任何数据库连接。
from sanic import Sanic
from sanic.response import text
app = Sanic()
class DummyCluser:
def connect(self):
print("Connecting")
return "session"
def shutdown(self):
print("Shutting down")
def setup_cassandra_session_listener(app, loop):
# No global variables needed
app.cluster = DummyCluser()
app.session = app.cluster.connect()
def teardown_cassandra_session_listener(app, loop):
app.cluster.shutdown()
def register_cassandra(app: Sanic):
# Changed these listeners to be more friendly if running with and ASGI server
app.listener('after_server_start')(setup_cassandra_session_listener)
app.listener('before_server_stop')(teardown_cassandra_session_listener)
@app.get("/")
async def get(request):
return text(app.session)
if __name__ == "__main__":
register_cassandra(app)
app.run(debug=True)
我们的想法是,您附加到您的 app
实例(就像您所做的那样),然后可以使用 request.app
.
在您的路由中简单地访问它
异步的新手,所以这是我的问题,在此先感谢您。 大家好,很简单的问题,我可能想太多了。
我正在尝试在下面这些已注册到 sanic 主应用程序的已定义侦听器之外访问此 cassandra 客户端。
我需要会话才能使用将异步执行的更新查询。我可以从下面的 'setup_cassandra_session_listener' 方法明确连接和事件查询。但是很难弄清楚如何在外部调用这个 Cassandra 会话并隔离,以便我可以访问其他地方。
from aiocassandra import aiosession
from cassandra.cluster import Cluster
from sanic import Sanic
from config import CLUSTER_HOST, TABLE_NAME, CASSANDRA_KEY_SPACE, CASSANDRA_PORT, DATA_CENTER, DEBUG_LEVEL, LOGGER_FORMAT
log = logging.getLogger('sanic')
log.setLevel('INFO')
cassandra_cluster = None
def setup_cassandra_session_listener(app, loop):
global cassandra_cluster
cassandra_cluster = Cluster([CLUSTER_HOST], CASSANDRA_PORT, DATA_CENTER)
session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)
metadata = cassandra_cluster.metadata
app.session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)
log.info('Connected to cluster: ' + metadata.cluster_name)
aiosession(session)
app.cassandra = session
def teardown_cassandra_session_listener(app, loop):
global cassandra_cluster
cassandra_cluster.shutdown()
def register_cassandra(app: Sanic):
app.listener('before_server_start')(setup_cassandra_session_listener)
app.listener('after_server_stop')(teardown_cassandra_session_listener)
这是一个可以满足您需要的工作示例。它实际上不是 运行 Cassandra(因为我没有这样做的经验)。但是,原则上这应该适用于您需要在 运行ning 服务器的整个生命周期内管理的任何数据库连接。
from sanic import Sanic
from sanic.response import text
app = Sanic()
class DummyCluser:
def connect(self):
print("Connecting")
return "session"
def shutdown(self):
print("Shutting down")
def setup_cassandra_session_listener(app, loop):
# No global variables needed
app.cluster = DummyCluser()
app.session = app.cluster.connect()
def teardown_cassandra_session_listener(app, loop):
app.cluster.shutdown()
def register_cassandra(app: Sanic):
# Changed these listeners to be more friendly if running with and ASGI server
app.listener('after_server_start')(setup_cassandra_session_listener)
app.listener('before_server_stop')(teardown_cassandra_session_listener)
@app.get("/")
async def get(request):
return text(app.session)
if __name__ == "__main__":
register_cassandra(app)
app.run(debug=True)
我们的想法是,您附加到您的 app
实例(就像您所做的那样),然后可以使用 request.app
.