asyncio/aiohttp 的多个 Websocket 流式传输

multiple Websocket streaming with asyncio/aiohttp

我正在尝试使用 python 的 asyncioaiohttp.

订阅多个 Websocket 流媒体

当我 运行 下面的代码时,它只打印“a”,但在控制台中没有其他输出。 它不会抛出任何错误,而且我无法逐步调试,因为它是 异步 代码。

我想弄清楚问题出在哪里,如果有人能提供帮助,我将不胜感激。

import aiohttp
import asyncio

async def coro(event, item1, item2):
    print("a")
    async with aiohttp.ClientSession.ws_connect(url='url') as ws:
        event.set()
        print("b")
        await asyncio.gather(ws.send_json(item1),
                             ws.send_json(item2))
        async for msg in ws:
            print("c")
            print(msg)

async def ws_connect(item1, item2):
    event = asyncio.Event()
    task = asyncio.create_task(coro(event, item1, item2))
    await event.wait()  # wait until the event is set() to True, while waiting, block
    return task

async def main():
    item1 = {
        "method": "subscribe",
        "params": {'channel': "bar"}
    }
    item2 = {
        "method": "subscribe",
        "params": {'channel': "foo"}
    }
    ws_task = await ws_connect(item1, item2)
    await ws_task

asyncio.run(main())

您错误地调用了 ws_connect。正确方式:

async with aiohttp.ClientSession() as session:
    async with session.ws_connect('url') as was:
        ...

完整示例:

import aiohttp
import asyncio

async def coro(event, item1, item2):
    print("a")
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('wss://echo.websocket.org') as ws:
            event.set()
            print("b")
            await asyncio.gather(ws.send_json(item1),
                                 ws.send_json(item2))
            async for msg in ws:
                print("c")
                print(msg)


async def ws_connect(item1, item2):
    event = asyncio.Event()
    task = asyncio.create_task(coro(event, item1, item2))
    await event.wait()  # wait until the event is set() to True, while waiting, block
    return task

async def main():
    item1 = {
        "method": "subscribe",
        "params": {'channel': "bar"}
    }
    item2 = {
        "method": "subscribe",
        "params": {'channel': "foo"}
    }
    ws_task = await ws_connect(item1, item2)
    await ws_task

asyncio.run(main())