asyncio/aiohttp 的多个 Websocket 流式传输
multiple Websocket streaming with asyncio/aiohttp
我正在尝试使用 python 的 asyncio
和 aiohttp
.
订阅多个 Websocket 流媒体
当我 运行 下面的代码时,它只打印“a”,但在控制台中没有其他输出。
它不会抛出任何错误,而且我无法逐步调试,因为它是 异步 代码。
我想弄清楚问题出在哪里,如果有人能提供帮助,我将不胜感激。
import aiohttp
import asyncio
async def coro(event, item1, item2):
print("a")
async with aiohttp.ClientSession.ws_connect(url='url') as ws:
event.set()
print("b")
await asyncio.gather(ws.send_json(item1),
ws.send_json(item2))
async for msg in ws:
print("c")
print(msg)
async def ws_connect(item1, item2):
event = asyncio.Event()
task = asyncio.create_task(coro(event, item1, item2))
await event.wait() # wait until the event is set() to True, while waiting, block
return task
async def main():
item1 = {
"method": "subscribe",
"params": {'channel': "bar"}
}
item2 = {
"method": "subscribe",
"params": {'channel': "foo"}
}
ws_task = await ws_connect(item1, item2)
await ws_task
asyncio.run(main())
您错误地调用了 ws_connect
。正确方式:
async with aiohttp.ClientSession() as session:
async with session.ws_connect('url') as was:
...
完整示例:
import aiohttp
import asyncio
async def coro(event, item1, item2):
print("a")
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://echo.websocket.org') as ws:
event.set()
print("b")
await asyncio.gather(ws.send_json(item1),
ws.send_json(item2))
async for msg in ws:
print("c")
print(msg)
async def ws_connect(item1, item2):
event = asyncio.Event()
task = asyncio.create_task(coro(event, item1, item2))
await event.wait() # wait until the event is set() to True, while waiting, block
return task
async def main():
item1 = {
"method": "subscribe",
"params": {'channel': "bar"}
}
item2 = {
"method": "subscribe",
"params": {'channel': "foo"}
}
ws_task = await ws_connect(item1, item2)
await ws_task
asyncio.run(main())
我正在尝试使用 python 的 asyncio
和 aiohttp
.
当我 运行 下面的代码时,它只打印“a”,但在控制台中没有其他输出。 它不会抛出任何错误,而且我无法逐步调试,因为它是 异步 代码。
我想弄清楚问题出在哪里,如果有人能提供帮助,我将不胜感激。
import aiohttp
import asyncio
async def coro(event, item1, item2):
print("a")
async with aiohttp.ClientSession.ws_connect(url='url') as ws:
event.set()
print("b")
await asyncio.gather(ws.send_json(item1),
ws.send_json(item2))
async for msg in ws:
print("c")
print(msg)
async def ws_connect(item1, item2):
event = asyncio.Event()
task = asyncio.create_task(coro(event, item1, item2))
await event.wait() # wait until the event is set() to True, while waiting, block
return task
async def main():
item1 = {
"method": "subscribe",
"params": {'channel': "bar"}
}
item2 = {
"method": "subscribe",
"params": {'channel': "foo"}
}
ws_task = await ws_connect(item1, item2)
await ws_task
asyncio.run(main())
您错误地调用了 ws_connect
。正确方式:
async with aiohttp.ClientSession() as session:
async with session.ws_connect('url') as was:
...
完整示例:
import aiohttp
import asyncio
async def coro(event, item1, item2):
print("a")
async with aiohttp.ClientSession() as session:
async with session.ws_connect('wss://echo.websocket.org') as ws:
event.set()
print("b")
await asyncio.gather(ws.send_json(item1),
ws.send_json(item2))
async for msg in ws:
print("c")
print(msg)
async def ws_connect(item1, item2):
event = asyncio.Event()
task = asyncio.create_task(coro(event, item1, item2))
await event.wait() # wait until the event is set() to True, while waiting, block
return task
async def main():
item1 = {
"method": "subscribe",
"params": {'channel': "bar"}
}
item2 = {
"method": "subscribe",
"params": {'channel': "foo"}
}
ws_task = await ws_connect(item1, item2)
await ws_task
asyncio.run(main())