如何对连接到 websocket 的每个客户端使用唯一查询

How can I use unique query for every client connected to websocket

我试图在我的 websocket 中只有一个数据库连接,并且 return 此信息到每个连接的客户端。可以吗?

这是我当前的代码:

import asyncio
import aiopg
import websockets
import logging
import sys
import configparser

config = configparser.ConfigParser()
config.read('config/config.ini')

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger('websockets.server')
logger2 = logging.getLogger('asyncio')
logger2.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
logger2.addHandler(logging.StreamHandler())

async def listen(websocket, path):

    async with aiopg.create_pool(config.get('default', 'connexion_bd'), maxsize=1, pool_recycle=0) as pool:
        async with pool.acquire() as conn1:
            async with conn1.cursor() as cur:

                await cur.execute(config.get('default', 'pg_listen'))
                while True:
                    msg = await conn1.notifies.get()
                    if msg.payload == 'finish':
                        return
                    else:
                        await websocket.send(msg.payload)


start_server = websockets.serve(listen, 'localhost', config.getint('default', 'port_websocket_server'))

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

目前,每次客户端监听我的 websocket 时,我的数据库中都会有一个新连接。每个客户端都将获得相同的内容,因此我不需要它们连接到数据库,只需要我的 websocket。

我尝试拆分我的代码(连接到正在连接此代码的另一个 websocket),但我遇到了同样的问题。

如有任何提示,我们将不胜感激。

谢谢

知道了

需要在查询后加上websocket.serve

import asyncio
import functools

import aiopg
import websockets
import logging
import sys
import configparser

config = configparser.ConfigParser()
config.read('config/config.ini')

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger('websockets.server')
logger2 = logging.getLogger('asyncio')
logger2.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
logger2.addHandler(logging.StreamHandler())

# Pour starter
# env/bin/python3 websocket_server_aiopg.py

USERS = set()


async def listen(websocket, path, conn1):
    USERS.add(websocket)

    while True:
        msg = await conn1.notifies.get()

        if msg.payload == 'finish':
            return
        else:
            await asyncio.wait(([user.send(msg.payload) for user in USERS]))


async def run_server():
    async with aiopg.create_pool(config.get('default', 'connexion_bd')) as pool:
        async with pool.acquire() as conn1:
            async with conn1.cursor() as cur:
                await cur.execute(config.get('default', 'pg_listen'))
                async with websockets.serve(functools.partial(listen, conn1=conn1), 'localhost', config.getint('default', 'port_websocket_server')) as ws:
                    await asyncio.sleep(1_000_000_000)  # consider waiting on an exit condition instead

asyncio.get_event_loop().run_until_complete(run_server())