Asyncio + aiohttp - 单个处理程序中的 redis Pub/Sub 和 websocket read/write
Asyncio + aiohttp - redis Pub/Sub and websocket read/write in single handler
我目前正在使用 aiohttp 看看它作为具有 websocket 连接的移动应用程序的服务器应用程序如何执行。
这是简单的 "Hello world" 示例 (as gist here):
import asyncio
import aiohttp
from aiohttp import web
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(
handler,
'127.0.0.1',
8080,
)
srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
问题
现在我想使用下面描述的结构 (node server = python aiohttp)。更具体地说,使用 Redis Pub/Sub mechanism with asyncio-redis 在我的 WebsocketEchoHandler.
中读取和写入 websocket 连接和 Redis
WebsocketEchoHandler 是一个非常简单的循环,所以我不确定应该如何完成。使用 Tornado and brükva 我只会使用回调。
额外的(可能是题外话)问题
由于我已经在使用 Redis,我应该采用以下两种方法中的哪一种:
- 就像在 "classic" 网络应用程序中一样,对所有内容都有一个 controller/view,使用 Redis 仅用于消息传递等。
- Web 应用程序应该只是客户端和 Redis 之间的一个层,也用作任务队列(最简单的 Python RQ)。每个请求都应该委托给工人。
编辑
图片来自http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis
编辑 2
看来我需要澄清一下
- 仅 Websocket 处理程序如上所示
Redis Pub/Sub 处理程序可能看起来像这样:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
msg = yield from subscriber.next_published()
ws.send_str(msg.value + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
此处理程序仅订阅 Redis 通道 ch1 和 ch2 并将从这些通道收到的每条消息发送到 websocket。
我想要这个处理程序:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
while True:
# If message recived from redis OR from websocket
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
if msg_ws:
# push to redis / do something else
self.on_msg_from_ws(msg_ws)
if msg_redis:
self.on_msg_from_redis(msg_redis)
except:
pass
finally:
print('Connection closed')
return ws
但是以下代码总是按顺序调用,因此从 websocket 读取会阻止从 Redis 读取:
msg_ws = yield from ws.receive()
msg_redis = yield from subscriber.next_published()
我想在 事件 上完成阅读,其中 事件 是从两个来源之一收到的消息。
您应该使用两个 while
循环 - 一个处理来自 websocket 的消息,一个处理来自 redis 的消息。您的主处理程序可以启动两个协同程序,一个处理每个循环,然后等待 both 个:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
# Kick off both coroutines in parallel, and then block
# until both are completed.
yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
except Exception as e: # Don't do except: pass
import traceback
traceback.print_exc()
finally:
print('Connection closed')
return ws
@asyncio.coroutine
def handle_ws(self, ws):
while True:
msg_ws = yield from ws.receive()
if msg_ws:
self.on_msg_from_ws(msg_ws)
@asyncio.coroutine
def handle_redis(self, subscriber):
while True:
msg_redis = yield from subscriber.next_published()
if msg_redis:
self.on_msg_from_redis(msg_redis)
这样您就可以从两个潜在来源中的任何一个阅读,而不必关心另一个。
最近我们可以在 python 3.5 及更高版本中使用异步等待..
async def task1(ws):
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = msg.data
print(data)
if data:
await ws.send_str('pong')
## ch is a redis channel
async def task2(ch):
async for msg in ch1.iter(encoding="utf-8", decoder=json.loads):
print("receving", msg)
user_token = msg['token']
if user_token in r_cons.keys():
_ws = r_cons[user_token]
await _ws.send_json(msg)
coroutines = list()
coroutines.append(task1(ws))
coroutines.append(task2(ch1))
await asyncio.gather(*coroutines)
这就是我 do.when websockets 需要等待来自多个源的消息。
这里的要点是使用 asyncio.gather 到 运行 两个 corotine 在一起就像
@dano 提到。
我目前正在使用 aiohttp 看看它作为具有 websocket 连接的移动应用程序的服务器应用程序如何执行。
这是简单的 "Hello world" 示例 (as gist here):
import asyncio
import aiohttp
from aiohttp import web
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
print('Connection opened')
try:
while True:
msg = yield from ws.receive()
ws.send_str(msg.data + '/answer')
except:
pass
finally:
print('Connection closed')
return ws
if __name__ == "__main__":
app = aiohttp.web.Application()
app.router.add_route('GET', '/ws', WebsocketEchoHandler())
loop = asyncio.get_event_loop()
handler = app.make_handler()
f = loop.create_server(
handler,
'127.0.0.1',
8080,
)
srv = loop.run_until_complete(f)
print("Server started at {sock[0]}:{sock[1]}".format(
sock=srv.sockets[0].getsockname()
))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.finish())
loop.close()
问题
现在我想使用下面描述的结构 (node server = python aiohttp)。更具体地说,使用 Redis Pub/Sub mechanism with asyncio-redis 在我的 WebsocketEchoHandler.
中读取和写入 websocket 连接和 RedisWebsocketEchoHandler 是一个非常简单的循环,所以我不确定应该如何完成。使用 Tornado and brükva 我只会使用回调。
额外的(可能是题外话)问题
由于我已经在使用 Redis,我应该采用以下两种方法中的哪一种:
- 就像在 "classic" 网络应用程序中一样,对所有内容都有一个 controller/view,使用 Redis 仅用于消息传递等。
- Web 应用程序应该只是客户端和 Redis 之间的一个层,也用作任务队列(最简单的 Python RQ)。每个请求都应该委托给工人。
编辑
图片来自http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis
编辑 2
看来我需要澄清一下
- 仅 Websocket 处理程序如上所示
Redis Pub/Sub 处理程序可能看起来像这样:
class WebsocketEchoHandler: @asyncio.coroutine def __call__(self, request): ws = web.WebSocketResponse() ws.start(request) connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379) subscriber = yield from connection.start_subscribe() yield from subscriber.subscribe(['ch1', 'ch2']) print('Connection opened') try: while True: msg = yield from subscriber.next_published() ws.send_str(msg.value + '/answer') except: pass finally: print('Connection closed') return ws
此处理程序仅订阅 Redis 通道 ch1 和 ch2 并将从这些通道收到的每条消息发送到 websocket。
我想要这个处理程序:
class WebsocketEchoHandler: @asyncio.coroutine def __call__(self, request): ws = web.WebSocketResponse() ws.start(request) connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379) subscriber = yield from connection.start_subscribe() yield from subscriber.subscribe(['ch1', 'ch2']) print('Connection opened') try: while True: # If message recived from redis OR from websocket msg_ws = yield from ws.receive() msg_redis = yield from subscriber.next_published() if msg_ws: # push to redis / do something else self.on_msg_from_ws(msg_ws) if msg_redis: self.on_msg_from_redis(msg_redis) except: pass finally: print('Connection closed') return ws
但是以下代码总是按顺序调用,因此从 websocket 读取会阻止从 Redis 读取:
msg_ws = yield from ws.receive() msg_redis = yield from subscriber.next_published()
我想在 事件 上完成阅读,其中 事件 是从两个来源之一收到的消息。
您应该使用两个 while
循环 - 一个处理来自 websocket 的消息,一个处理来自 redis 的消息。您的主处理程序可以启动两个协同程序,一个处理每个循环,然后等待 both 个:
class WebsocketEchoHandler:
@asyncio.coroutine
def __call__(self, request):
ws = web.WebSocketResponse()
ws.start(request)
connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
subscriber = yield from connection.start_subscribe()
yield from subscriber.subscribe(['ch1', 'ch2'])
print('Connection opened')
try:
# Kick off both coroutines in parallel, and then block
# until both are completed.
yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
except Exception as e: # Don't do except: pass
import traceback
traceback.print_exc()
finally:
print('Connection closed')
return ws
@asyncio.coroutine
def handle_ws(self, ws):
while True:
msg_ws = yield from ws.receive()
if msg_ws:
self.on_msg_from_ws(msg_ws)
@asyncio.coroutine
def handle_redis(self, subscriber):
while True:
msg_redis = yield from subscriber.next_published()
if msg_redis:
self.on_msg_from_redis(msg_redis)
这样您就可以从两个潜在来源中的任何一个阅读,而不必关心另一个。
最近我们可以在 python 3.5 及更高版本中使用异步等待..
async def task1(ws):
async for msg in ws:
if msg.type == WSMsgType.TEXT:
data = msg.data
print(data)
if data:
await ws.send_str('pong')
## ch is a redis channel
async def task2(ch):
async for msg in ch1.iter(encoding="utf-8", decoder=json.loads):
print("receving", msg)
user_token = msg['token']
if user_token in r_cons.keys():
_ws = r_cons[user_token]
await _ws.send_json(msg)
coroutines = list()
coroutines.append(task1(ws))
coroutines.append(task2(ch1))
await asyncio.gather(*coroutines)
这就是我 do.when websockets 需要等待来自多个源的消息。
这里的要点是使用 asyncio.gather 到 运行 两个 corotine 在一起就像 @dano 提到。