在 Tornado 的阻塞上下文中调用异步函数
Call async functions in blocking context in Tornado
我想在Tornado框架中实现一个基于web sockets的服务。当用户关闭网络套接字时,我想通知其他用户这件事。但是,on_close
显然是一个阻塞函数,而我的 _broadcast(str) -> None
函数是异步的。
我怎样才能调用这个函数?
from tornado import websocket
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
try:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
app = web.Application([
(r'/ws', SocketHandler)
])
if __name__ == '__main__':
app.listen(9000)
ioloop.IOLoop.instance().start()
我认为涉及使用 asyncio.Queue
的解决方案应该适合您。
我做了一个小的 class 作为模型来测试这个:
import asyncio
import time
class Thing:
on_close_q = asyncio.Queue()
def __init__(self):
self.conns = range(3)
def on_close(self, id):
time.sleep(id)
print(f'closing {id}')
self.on_close_q.put_nowait((self, id))
async def notify(self, msg):
print('in notify')
for conn in range(3):
print(f'notifying {conn} {msg}')
async def monitor_on_close():
print('monitoring')
while True:
instance, id = await Thing.on_close_q.get()
await instance.notify(f'{id} is closed')
从那里,你需要 运行 monitor_on_close
在你从龙卷风得到的 ioloop
中。我从未使用过龙卷风,但我认为将类似这样的内容添加到您的 __main__
块应该有效:
ioloop.IOLoop.current().add_callback(monitor_on_close)
您正在寻找的简单解决方案是在调用协程时使用 asyncio.create_task
:
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
asyncio.create_task(self._broadcast("something"))
(此函数的遗留 Tornado 版本是 tornado.gen.convert_yielded
,但现在 Tornado 和 asyncio 已集成,没有理由不为原生协程使用 asyncio 版本)
但是对于这个特殊的问题,在你的 _broadcast
函数中使用 await
并不理想。等待 write_message
用于提供流量控制,但 create_task
对 await
提供的背压没有任何用处。 (write_message
非常不寻常,因为完全支持使用和不使用 await
调用它)。事实上,它对错误的事情施加背压 - 一个慢速连接会减慢对所有其他连接的通知。
所以在这种情况下,我建议将 _broadcast
设为常规同步函数:
def _broadcast(self, msg):
for conn in self._conns:
try:
conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
如果你想更好地控制内存使用(通过 await write_message
提供的流量控制),你将需要一个更复杂的解决方案,可能涉及每个连接的有界队列(在 on_close
,使用 put_nowait
将消息添加到每个连接的队列,然后有一个任务从队列中读取并使用 await write_message
)
写入消息
我想在Tornado框架中实现一个基于web sockets的服务。当用户关闭网络套接字时,我想通知其他用户这件事。但是,on_close
显然是一个阻塞函数,而我的 _broadcast(str) -> None
函数是异步的。
我怎样才能调用这个函数?
from tornado import websocket
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SocketHandler(websocket.WebSocketHandler):
async def open(self, *args, conns, **kwargs):
logger.info(f"Opened a new connection to client {id(self)}")
self._conns = conns
async def on_message(self, message):
logger.info(f"Client {id(self)} sent message: {message}")
await self._broadcast(message)
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
self._broadcast("something") # TODO
async def _broadcast(self, msg):
for conn in self._conns:
try:
await conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
app = web.Application([
(r'/ws', SocketHandler)
])
if __name__ == '__main__':
app.listen(9000)
ioloop.IOLoop.instance().start()
我认为涉及使用 asyncio.Queue
的解决方案应该适合您。
我做了一个小的 class 作为模型来测试这个:
import asyncio
import time
class Thing:
on_close_q = asyncio.Queue()
def __init__(self):
self.conns = range(3)
def on_close(self, id):
time.sleep(id)
print(f'closing {id}')
self.on_close_q.put_nowait((self, id))
async def notify(self, msg):
print('in notify')
for conn in range(3):
print(f'notifying {conn} {msg}')
async def monitor_on_close():
print('monitoring')
while True:
instance, id = await Thing.on_close_q.get()
await instance.notify(f'{id} is closed')
从那里,你需要 运行 monitor_on_close
在你从龙卷风得到的 ioloop
中。我从未使用过龙卷风,但我认为将类似这样的内容添加到您的 __main__
块应该有效:
ioloop.IOLoop.current().add_callback(monitor_on_close)
您正在寻找的简单解决方案是在调用协程时使用 asyncio.create_task
:
def on_close(self):
logger.info(f"Client {id(self)} has left the scene")
self._conns.remove(self)
asyncio.create_task(self._broadcast("something"))
(此函数的遗留 Tornado 版本是 tornado.gen.convert_yielded
,但现在 Tornado 和 asyncio 已集成,没有理由不为原生协程使用 asyncio 版本)
但是对于这个特殊的问题,在你的 _broadcast
函数中使用 await
并不理想。等待 write_message
用于提供流量控制,但 create_task
对 await
提供的背压没有任何用处。 (write_message
非常不寻常,因为完全支持使用和不使用 await
调用它)。事实上,它对错误的事情施加背压 - 一个慢速连接会减慢对所有其他连接的通知。
所以在这种情况下,我建议将 _broadcast
设为常规同步函数:
def _broadcast(self, msg):
for conn in self._conns:
try:
conn.write_message(msg)
except websocket.WebSocketClosedError:
pass
如果你想更好地控制内存使用(通过 await write_message
提供的流量控制),你将需要一个更复杂的解决方案,可能涉及每个连接的有界队列(在 on_close
,使用 put_nowait
将消息添加到每个连接的队列,然后有一个任务从队列中读取并使用 await write_message
)