具有多个服务器的 Asyncio 循环
Asyncio loop with several servers
我有以下代码片段,应该 运行 一个 websockets 服务器 + 一个 NATS 服务器。
每次新消息到达 NATS 时,websocket 服务器应该将消息发送到所有连接的 websockets。我看到一个奇怪的行为,因为 websocket 连接没有保持活动状态。
我想我在 asyncio 循环中搞混了。知道我在这里遗漏了什么吗?
import asyncio
import os
import logging
import json
import websockets
from nats.aio.client import Client as NATS
async def main():
#
# websocket related stuff
#
clients = set()
# Register each new client
async def register(websocket, path):
clients.add(websocket)
await websockets.serve(register, "localhost", 8765)
# NATS related stuff
# Send message to all connected websockets
async def message_handler(msg):
data = msg.data.decode()
logging.debug('handling new message: {}'.format(data))
for ws in clients:
await ws.send("test")
# Connection to NATS message queue
nats_url = os.getenv('NATS_URL', 'nats://nats:4222')
nc = NATS()
await nc.connect([nats_url])
# Subscription to all data related messages
await nc.subscribe("data.*", cb=message_handler)
await asyncio.Event().wait()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
每次有新的websocket客户端连接,注册后连接关闭:
DEBUG:websockets.protocol:server - state = CONNECTING
DEBUG:websockets.protocol:server - event = connection_made(<_SelectorSocketTransport fd=6 read=idle write=<idle, bufsize=0>>)
DEBUG:websockets.protocol:server - event = data_received(<535 bytes>)
DEBUG:websockets.server:server < GET /?token=234RZRZER HTTP/1.1
DEBUG:websockets.server:server < Headers([('Host', '127.0.0.1:8765'), ('Connection', 'Upgrade'), ('Pragma', 'no-cache'), ('Cache-Control', 'no-cache'), ('Upgrade', 'websocket'), ('Origin', 'file://'), ('Sec-WebSocket-Version', '13'), ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_4_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36'), ('Accept-Encoding', 'gzip, deflate, br'), ('Accept-Language', 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7'), ('Sec-GPC', '1'), ('Sec-WebSocket-Key', 'KL0hFYOjOc25n/XwcON/9A=='), ('Sec-WebSocket-Extensions', 'permessage-deflate; client_max_window_bits')])
DEBUG:websockets.server:server > HTTP/1.1 101 Switching Protocols
DEBUG:websockets.server:server > Headers([('Upgrade', 'websocket'), ('Connection', 'Upgrade'), ('Sec-WebSocket-Accept', 'JpQAUwE+IzhPyNdqQuLeyROFhGo='), ('Sec-WebSocket-Extensions', 'permessage-deflate'), ('Date', 'Wed, 04 Aug 2021 13:21:40 GMT'), ('Server', 'Python/3.8 websockets/9.1')])
DEBUG:websockets.protocol:server - state = OPEN
DEBUG:websockets.protocol:server - state = CLOSING
DEBUG:websockets.protocol:server > Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server - event = data_received(<8 bytes>)
DEBUG:websockets.protocol:server < Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server x half-closing TCP connection
DEBUG:websockets.protocol:server - event = eof_received()
DEBUG:websockets.protocol:server - event = connection_lost(None)
DEBUG:websockets.protocol:server - state = CLOSED
DEBUG:websockets.protocol:server x code = 1000, reason = [no reason]
您可能想要这样的东西 – 两个任务,一个为 websockets 服务,另一个用于监听消息,以及一个连接两者的队列。
这里可能有一些愚蠢的错误,因为我没有 NATS/websocket 可以用来测试的设置。
import asyncio
import os
import logging
import websockets
from nats.aio.client import Client as NATS
async def boot_server(
stop_signal: asyncio.Event, message_queue: asyncio.Queue
):
clients = set()
async def register(websocket, path):
# Register client
clients.add(websocket)
try:
# Wait forever for messages
async for message in websocket:
print(websocket, message)
finally:
try:
clients.remove(websocket)
except Exception:
pass
async with websockets.serve(register, "localhost", 8765):
while not stop_signal.is_set():
# TODO: there's a small bug here in that the stop signal is only checked
# after a message has been processed
msg = await message_queue.get()
for client in clients:
# TODO: should probably add error tolerance in this loop
# (i.e. if one send fails, others are still sent)
await client.send(msg)
async def listen_nats(message_queue: asyncio.Queue):
# Connection to NATS message queue
nats_url = os.getenv("NATS_URL", "nats://nats:4222")
nc = NATS()
await nc.connect([nats_url])
async def message_handler(msg):
data = msg.data.decode()
logging.debug("handling new message: {}".format(data))
await message_queue.put(data)
# Subscription to all data related messages
await nc.subscribe("data.*", cb=message_handler)
# TODO: figure out how to close the NATS connection?
async def main():
stop_signal = asyncio.Event()
message_queue = asyncio.Queue()
ws_server_task = asyncio.create_task(
boot_server(stop_signal, message_queue)
)
nats_task = asyncio.create_task(listen_nats(message_queue))
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
stop_signal.set()
await ws_server_task
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
我有以下代码片段,应该 运行 一个 websockets 服务器 + 一个 NATS 服务器。
每次新消息到达 NATS 时,websocket 服务器应该将消息发送到所有连接的 websockets。我看到一个奇怪的行为,因为 websocket 连接没有保持活动状态。
我想我在 asyncio 循环中搞混了。知道我在这里遗漏了什么吗?
import asyncio
import os
import logging
import json
import websockets
from nats.aio.client import Client as NATS
async def main():
#
# websocket related stuff
#
clients = set()
# Register each new client
async def register(websocket, path):
clients.add(websocket)
await websockets.serve(register, "localhost", 8765)
# NATS related stuff
# Send message to all connected websockets
async def message_handler(msg):
data = msg.data.decode()
logging.debug('handling new message: {}'.format(data))
for ws in clients:
await ws.send("test")
# Connection to NATS message queue
nats_url = os.getenv('NATS_URL', 'nats://nats:4222')
nc = NATS()
await nc.connect([nats_url])
# Subscription to all data related messages
await nc.subscribe("data.*", cb=message_handler)
await asyncio.Event().wait()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())
每次有新的websocket客户端连接,注册后连接关闭:
DEBUG:websockets.protocol:server - state = CONNECTING
DEBUG:websockets.protocol:server - event = connection_made(<_SelectorSocketTransport fd=6 read=idle write=<idle, bufsize=0>>)
DEBUG:websockets.protocol:server - event = data_received(<535 bytes>)
DEBUG:websockets.server:server < GET /?token=234RZRZER HTTP/1.1
DEBUG:websockets.server:server < Headers([('Host', '127.0.0.1:8765'), ('Connection', 'Upgrade'), ('Pragma', 'no-cache'), ('Cache-Control', 'no-cache'), ('Upgrade', 'websocket'), ('Origin', 'file://'), ('Sec-WebSocket-Version', '13'), ('User-Agent', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_4_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.192 Safari/537.36'), ('Accept-Encoding', 'gzip, deflate, br'), ('Accept-Language', 'fr-FR,fr;q=0.9,en-US;q=0.8,en;q=0.7'), ('Sec-GPC', '1'), ('Sec-WebSocket-Key', 'KL0hFYOjOc25n/XwcON/9A=='), ('Sec-WebSocket-Extensions', 'permessage-deflate; client_max_window_bits')])
DEBUG:websockets.server:server > HTTP/1.1 101 Switching Protocols
DEBUG:websockets.server:server > Headers([('Upgrade', 'websocket'), ('Connection', 'Upgrade'), ('Sec-WebSocket-Accept', 'JpQAUwE+IzhPyNdqQuLeyROFhGo='), ('Sec-WebSocket-Extensions', 'permessage-deflate'), ('Date', 'Wed, 04 Aug 2021 13:21:40 GMT'), ('Server', 'Python/3.8 websockets/9.1')])
DEBUG:websockets.protocol:server - state = OPEN
DEBUG:websockets.protocol:server - state = CLOSING
DEBUG:websockets.protocol:server > Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server - event = data_received(<8 bytes>)
DEBUG:websockets.protocol:server < Frame(fin=True, opcode=<Opcode.CLOSE: 8>, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
DEBUG:websockets.protocol:server x half-closing TCP connection
DEBUG:websockets.protocol:server - event = eof_received()
DEBUG:websockets.protocol:server - event = connection_lost(None)
DEBUG:websockets.protocol:server - state = CLOSED
DEBUG:websockets.protocol:server x code = 1000, reason = [no reason]
您可能想要这样的东西 – 两个任务,一个为 websockets 服务,另一个用于监听消息,以及一个连接两者的队列。
这里可能有一些愚蠢的错误,因为我没有 NATS/websocket 可以用来测试的设置。
import asyncio
import os
import logging
import websockets
from nats.aio.client import Client as NATS
async def boot_server(
stop_signal: asyncio.Event, message_queue: asyncio.Queue
):
clients = set()
async def register(websocket, path):
# Register client
clients.add(websocket)
try:
# Wait forever for messages
async for message in websocket:
print(websocket, message)
finally:
try:
clients.remove(websocket)
except Exception:
pass
async with websockets.serve(register, "localhost", 8765):
while not stop_signal.is_set():
# TODO: there's a small bug here in that the stop signal is only checked
# after a message has been processed
msg = await message_queue.get()
for client in clients:
# TODO: should probably add error tolerance in this loop
# (i.e. if one send fails, others are still sent)
await client.send(msg)
async def listen_nats(message_queue: asyncio.Queue):
# Connection to NATS message queue
nats_url = os.getenv("NATS_URL", "nats://nats:4222")
nc = NATS()
await nc.connect([nats_url])
async def message_handler(msg):
data = msg.data.decode()
logging.debug("handling new message: {}".format(data))
await message_queue.put(data)
# Subscription to all data related messages
await nc.subscribe("data.*", cb=message_handler)
# TODO: figure out how to close the NATS connection?
async def main():
stop_signal = asyncio.Event()
message_queue = asyncio.Queue()
ws_server_task = asyncio.create_task(
boot_server(stop_signal, message_queue)
)
nats_task = asyncio.create_task(listen_nats(message_queue))
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
stop_signal.set()
await ws_server_task
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())