运行 在单独的线程中同步异步代码
Running asynchronous code synchronously in separate thread
我正在使用 Django Channels 来支持 websockets 并使用他们的组概念向同一组中的多个消费者广播消息。为了向消费者外部发送消息,您需要在同步代码中调用异步方法。不幸的是,这在测试时会出现问题。
我开始使用 loop.run_until_complete
:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
'type': 'receive_group_json'}),
loop=loop))
然后堆栈跟踪读取线程没有事件循环:RuntimeError: There is no current event loop in thread 'Thread-1'.
。为了解决这个问题,我补充说:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
'type': 'receive_group_json'}),
loop=loop))
现在堆栈跟踪正在读取 RuntimeError: Event loop is closed
,尽管如果我添加打印语句 loop.is_closed()
会打印 False
.
对于上下文,我使用的是 Django 2.0、Channels 2 和 redis 后端。
更新:我在Python解释器中尝试了运行这个(在py.test之外删除移动变量)。当我 运行 第二个代码块时,我没有收到 Event loop is closed
错误(这可能是由于 Pytest 端的某些原因,无论是超时等)。但是,我没有在客户端收到群组消息。但是,我确实看到了打印语句:
({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> result=None>}, set())
更新 2:刷新 redis 后,我在 py.test 中添加了一个固定装置,为每个函数以及会话范围的事件循环刷新它。这次从 RedisChannelLayer 产生另一个打印:
({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> exception=RuntimeError('Task <Task pending coro=<RedisChannelLayer.group_send() running at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:316>> got Future <Future pending> attached to a different loop',)>}, set())
如果 channel_layer
希望驻留在另一个线程中它自己的事件循环中,您将需要获取该事件循环对象。一旦你有了它,你就可以向它提交协程并与你的线程同步,就像这样:
def wait_for_coro(coro, loop):
# submit coroutine to the event loop in the other thread
# and wait for it to complete
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.wait()
wait_for_coro(channel_layer.group_send(group_name, ...), channel_loop)
默认只有主线程获取事件循环,在其他线程中调用get_event_loop
会失败。
如果您需要另一个线程中的事件循环——例如处理 HTTP 或 WebSockets 请求的线程——您需要使用 new_event_loop
自行实现。之后您可以使用 set_event_loop
并且将来的 get_event_loop
调用将起作用。我这样做:
# get or create an event loop for the current thread
def get_thread_event_loop():
try:
loop = asyncio.get_event_loop() # gets previously set event loop, if possible
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
我正在使用 Django Channels 来支持 websockets 并使用他们的组概念向同一组中的多个消费者广播消息。为了向消费者外部发送消息,您需要在同步代码中调用异步方法。不幸的是,这在测试时会出现问题。
我开始使用 loop.run_until_complete
:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
'type': 'receive_group_json'}),
loop=loop))
然后堆栈跟踪读取线程没有事件循环:RuntimeError: There is no current event loop in thread 'Thread-1'.
。为了解决这个问题,我补充说:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(asyncio.ensure_future(channel_layer.group_send(group_name, {'text': json.dumps(message),
'type': 'receive_group_json'}),
loop=loop))
现在堆栈跟踪正在读取 RuntimeError: Event loop is closed
,尽管如果我添加打印语句 loop.is_closed()
会打印 False
.
对于上下文,我使用的是 Django 2.0、Channels 2 和 redis 后端。
更新:我在Python解释器中尝试了运行这个(在py.test之外删除移动变量)。当我 运行 第二个代码块时,我没有收到 Event loop is closed
错误(这可能是由于 Pytest 端的某些原因,无论是超时等)。但是,我没有在客户端收到群组消息。但是,我确实看到了打印语句:
({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> result=None>}, set())
更新 2:刷新 redis 后,我在 py.test 中添加了一个固定装置,为每个函数以及会话范围的事件循环刷新它。这次从 RedisChannelLayer 产生另一个打印:
({<Task finished coro=<RedisChannelLayer.group_send() done, defined at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:306> exception=RuntimeError('Task <Task pending coro=<RedisChannelLayer.group_send() running at /Users/my/path/to/venv/lib/python3.6/site-packages/channels_redis/core.py:316>> got Future <Future pending> attached to a different loop',)>}, set())
如果 channel_layer
希望驻留在另一个线程中它自己的事件循环中,您将需要获取该事件循环对象。一旦你有了它,你就可以向它提交协程并与你的线程同步,就像这样:
def wait_for_coro(coro, loop):
# submit coroutine to the event loop in the other thread
# and wait for it to complete
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.wait()
wait_for_coro(channel_layer.group_send(group_name, ...), channel_loop)
默认只有主线程获取事件循环,在其他线程中调用get_event_loop
会失败。
如果您需要另一个线程中的事件循环——例如处理 HTTP 或 WebSockets 请求的线程——您需要使用 new_event_loop
自行实现。之后您可以使用 set_event_loop
并且将来的 get_event_loop
调用将起作用。我这样做:
# get or create an event loop for the current thread
def get_thread_event_loop():
try:
loop = asyncio.get_event_loop() # gets previously set event loop, if possible
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop