尝试从侦听器 python 网络框架访问对象

Trying to access an object from a listener python web framework

异步的新手,所以这是我的问题,在此先感谢您。 大家好,很简单的问题,我可能想太多了。

我正在尝试在下面这些已注册到 sanic 主应用程序的已定义侦听器之外访问此 cassandra 客户端。

我需要会话才能使用将异步执行的更新查询。我可以从下面的 'setup_cassandra_session_listener' 方法明确连接和事件查询。但是很难弄清楚如何在外部调用这个 Cassandra 会话并隔离,以便我可以访问其他地方。

from aiocassandra import aiosession
from cassandra.cluster import Cluster
from sanic import Sanic
from config import  CLUSTER_HOST, TABLE_NAME, CASSANDRA_KEY_SPACE, CASSANDRA_PORT, DATA_CENTER, DEBUG_LEVEL,  LOGGER_FORMAT


log = logging.getLogger('sanic')
log.setLevel('INFO')


cassandra_cluster = None


def setup_cassandra_session_listener(app, loop):
    global cassandra_cluster

    cassandra_cluster = Cluster([CLUSTER_HOST], CASSANDRA_PORT, DATA_CENTER)
    session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)

    metadata = cassandra_cluster.metadata

    app.session = cassandra_cluster.connect(CASSANDRA_KEY_SPACE)
    log.info('Connected to cluster: ' + metadata.cluster_name)

    aiosession(session)
    app.cassandra = session


def teardown_cassandra_session_listener(app, loop):
    global cassandra_cluster
    cassandra_cluster.shutdown()


def register_cassandra(app: Sanic):
    app.listener('before_server_start')(setup_cassandra_session_listener)
    app.listener('after_server_stop')(teardown_cassandra_session_listener)

这是一个可以满足您需要的工作示例。它实际上不是 运行 Cassandra(因为我没有这样做的经验)。但是,原则上这应该适用于您需要在 运行ning 服务器的整个生命周期内管理的任何数据库连接。

from sanic import Sanic
from sanic.response import text

app = Sanic()


class DummyCluser:
    def connect(self):
        print("Connecting")
        return "session"

    def shutdown(self):
        print("Shutting down")


def setup_cassandra_session_listener(app, loop):
    # No global variables needed

    app.cluster = DummyCluser()
    app.session = app.cluster.connect()


def teardown_cassandra_session_listener(app, loop):
    app.cluster.shutdown()


def register_cassandra(app: Sanic):
    # Changed these listeners to be more friendly if running with and ASGI server
    app.listener('after_server_start')(setup_cassandra_session_listener)
    app.listener('before_server_stop')(teardown_cassandra_session_listener)


@app.get("/")
async def get(request):
    return text(app.session)


if __name__ == "__main__":
    register_cassandra(app)
    app.run(debug=True)

我们的想法是,您附加到您的 app 实例(就像您所做的那样),然后可以使用 request.app.

在您的路由中简单地访问它