使用 asyncio 使 websocket 回调异步
Make websocket callback asynchronous with asyncio
我正在尝试使用 asyncio and websockets 和 Python 3.5.2.
来实现一个基本的 websocket 客户端
基本上,我希望 connect_to_dealer
成为一个阻塞调用,但在不同的线程上等待 websocket 消息。
阅读一些文档后(我对 Python 的经验很少),我得出结论 asyncio.ensure_future()
传递协程 (listen_for_message
) 是可行的方法。
现在,我在另一个线程上到达 运行 listen_for_message
,但是在协同程序中我似乎无法使用 await
或任何其他机制来进行调用同步。如果我这样做,即使是一个简单的 sleep
.
,执行也会永远等待(挂起)
我想知道我做错了什么。
async def listen_for_message(self, future, websocket):
while (True):
try:
await asyncio.sleep(1) # It hangs here
print('Listening for a message...')
message = await websocket.recv() # If I remove the sleep, hangs here
print("< {}".format(message))
future.set_result(message)
future.done()
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')
def handle_connect_message(self, future):
# We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
print(future.result)
async def connect_to_dealer(self):
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket'))
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
# Now we need to handle messages but continue with the regular execution
try:
future = asyncio.get_event_loop().create_future()
future.add_done_callback(self.handle_connect_message)
asyncio.ensure_future(self.listen_for_message(future, websocket))
except Exception as e:
print(e)
是否有特定原因需要使用显式期货?
通过 asyncio
,您可以结合使用 coroutines
和 Tasks
来实现大多数目的。任务本质上是包装好的协程,它们在后台自行启动,独立于其他异步代码,因此您不必显式管理它们的流程或将它们与其他代码混在一起。
我不完全确定您的最终目标,但也许下面阐述的方法可以为您提供一些帮助:
import asyncio
async def listen_for_message(websocket):
while True:
await asyncio.sleep(0)
try:
print('Listening for a message...')
message = await websocket.recv()
print("< {}".format(message))
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')
async def connect_to_dealer():
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket')
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
async def my_app():
# this will block until connect_to_dealer() returns
websocket = await connect_to_dealer()
# start listen_for_message() in its own task wrapper, so doing it continues in the background
asyncio.ensure_future(listen_for_message(websocket))
# you can continue with other code here that can now coexist with listen_for_message()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(my_app())
loop.run_forever()
我正在尝试使用 asyncio and websockets 和 Python 3.5.2.
来实现一个基本的 websocket 客户端基本上,我希望 connect_to_dealer
成为一个阻塞调用,但在不同的线程上等待 websocket 消息。
阅读一些文档后(我对 Python 的经验很少),我得出结论 asyncio.ensure_future()
传递协程 (listen_for_message
) 是可行的方法。
现在,我在另一个线程上到达 运行 listen_for_message
,但是在协同程序中我似乎无法使用 await
或任何其他机制来进行调用同步。如果我这样做,即使是一个简单的 sleep
.
我想知道我做错了什么。
async def listen_for_message(self, future, websocket):
while (True):
try:
await asyncio.sleep(1) # It hangs here
print('Listening for a message...')
message = await websocket.recv() # If I remove the sleep, hangs here
print("< {}".format(message))
future.set_result(message)
future.done()
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')
def handle_connect_message(self, future):
# We must first remove the websocket-specific payload because we're only interested in the connect protocol msg
print(future.result)
async def connect_to_dealer(self):
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket'))
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
# Now we need to handle messages but continue with the regular execution
try:
future = asyncio.get_event_loop().create_future()
future.add_done_callback(self.handle_connect_message)
asyncio.ensure_future(self.listen_for_message(future, websocket))
except Exception as e:
print(e)
是否有特定原因需要使用显式期货?
通过 asyncio
,您可以结合使用 coroutines
和 Tasks
来实现大多数目的。任务本质上是包装好的协程,它们在后台自行启动,独立于其他异步代码,因此您不必显式管理它们的流程或将它们与其他代码混在一起。
我不完全确定您的最终目标,但也许下面阐述的方法可以为您提供一些帮助:
import asyncio
async def listen_for_message(websocket):
while True:
await asyncio.sleep(0)
try:
print('Listening for a message...')
message = await websocket.recv()
print("< {}".format(message))
except websockets.ConnectionClosed as cc:
print('Connection closed')
except Exception as e:
print('Something happened')
async def connect_to_dealer():
print('connect to dealer')
websocket = await websockets.connect('wss://mywebsocket')
hello_message = await websocket.recv()
print("< {}".format(hello_message))
# We need to parse the connection ID out of the message
connection_id = hello_message['connectionId']
print('Got connection id {}'.format(connection_id))
sub_response = requests.put('https://subscribetotraffic{user_id}?connection={connection_id}'.format(
user_id='username', connection_id=connection_id), headers=headers)
if sub_response.status_code == 200:
print('Now we\'re observing traffic')
else:
print('Oops request failed with {code}'.format(code=sub_response.status_code))
async def my_app():
# this will block until connect_to_dealer() returns
websocket = await connect_to_dealer()
# start listen_for_message() in its own task wrapper, so doing it continues in the background
asyncio.ensure_future(listen_for_message(websocket))
# you can continue with other code here that can now coexist with listen_for_message()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(my_app())
loop.run_forever()