是否可以在事件循环已经 运行ning 时 运行 asyncio.Server 实例
Is it possible to run the asyncio.Server instance while the event loop is already running
我正在尝试了解,是否可以在事件循环已经 运行 时通过 run_forever
方法 运行 asyncio.Server
实例(来自当然是单独的线程)。
据我了解,服务器可以通过 loop.run_until_complete(asyncio.start_server(...))
或
await asyncio.start_server(...)
,如果循环已经 运行ning。
第一种方法对我来说是不可接受的,因为循环已经通过 run_forever 方法 运行ning。但我也不能使用 await 表达式,因为我将从 "loop area" 之外启动它(例如,从不能标记为异步的 main 方法,对吧?)
def loop_thread(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
print("loop clesed")
class SchedulerTestManager:
def __init__(self):
...
self.loop = asyncio.get_event_loop()
self.servers_loop_thread = threading.Thread(
target=loop_thread, args=(self.loop, ))
...
def start_test(self):
self.servers_loop_thread.start()
return self.servers_loop_thread
def add_router(self, router):
r = self.endpoint.add_router(router)
host = router.ConnectionParameters.Host
port = router.ConnectionParameters.Port
srv = TcpServer(host, port)
server_coro = asyncio.start_server(
self.handle_connection, self.host, self.port)
# does not work since add_router is not async
# self.server = await server_coro
# does not work, since the loop is already running
# self.server = self.loop.run_until_complete(server_coro)
return r
def maind():
st_manager = SchedulerTestManager()
thread = st_manager.start_test()
router = st_manager.add_router(router)
当然,最简单的解决方案是在开始测试之前添加所有路由器(服务器)(运行循环)。但我想尝试实现它,这样就可以在测试已经 运行ning 时添加路由器。我认为 loop.call_soon
(call_soon_threadsafe
) 方法可以帮助我,但似乎不能调度协程,而只是一个简单的函数。
希望我的解释不是很混乱。提前致谢!
为了在一个线程中执行的事件循环与在另一线程中执行的传统旧线程代码之间进行通信,您可以使用 janus 库。
这是一个有两个接口的队列:异步接口和线程安全同步接口。
这是用法示例:
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
@asyncio.coroutine
def async_coro(async_q):
for i in range(100):
val = yield from async_q.get()
assert val == i
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
您可以创建一个任务,在循环中等待来自队列的新消息,并根据请求启动新服务器。其他线程可能会将新消息推送到请求新服务器的队列中。
我正在尝试了解,是否可以在事件循环已经 运行 时通过 run_forever
方法 运行 asyncio.Server
实例(来自当然是单独的线程)。
据我了解,服务器可以通过 loop.run_until_complete(asyncio.start_server(...))
或
await asyncio.start_server(...)
,如果循环已经 运行ning。
第一种方法对我来说是不可接受的,因为循环已经通过 run_forever 方法 运行ning。但我也不能使用 await 表达式,因为我将从 "loop area" 之外启动它(例如,从不能标记为异步的 main 方法,对吧?)
def loop_thread(loop):
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.close()
print("loop clesed")
class SchedulerTestManager:
def __init__(self):
...
self.loop = asyncio.get_event_loop()
self.servers_loop_thread = threading.Thread(
target=loop_thread, args=(self.loop, ))
...
def start_test(self):
self.servers_loop_thread.start()
return self.servers_loop_thread
def add_router(self, router):
r = self.endpoint.add_router(router)
host = router.ConnectionParameters.Host
port = router.ConnectionParameters.Port
srv = TcpServer(host, port)
server_coro = asyncio.start_server(
self.handle_connection, self.host, self.port)
# does not work since add_router is not async
# self.server = await server_coro
# does not work, since the loop is already running
# self.server = self.loop.run_until_complete(server_coro)
return r
def maind():
st_manager = SchedulerTestManager()
thread = st_manager.start_test()
router = st_manager.add_router(router)
当然,最简单的解决方案是在开始测试之前添加所有路由器(服务器)(运行循环)。但我想尝试实现它,这样就可以在测试已经 运行ning 时添加路由器。我认为 loop.call_soon
(call_soon_threadsafe
) 方法可以帮助我,但似乎不能调度协程,而只是一个简单的函数。
希望我的解释不是很混乱。提前致谢!
为了在一个线程中执行的事件循环与在另一线程中执行的传统旧线程代码之间进行通信,您可以使用 janus 库。
这是一个有两个接口的队列:异步接口和线程安全同步接口。
这是用法示例:
import asyncio
import janus
loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
def threaded(sync_q):
for i in range(100):
sync_q.put(i)
sync_q.join()
@asyncio.coroutine
def async_coro(async_q):
for i in range(100):
val = yield from async_q.get()
assert val == i
async_q.task_done()
fut = loop.run_in_executor(None, threaded, queue.sync_q)
loop.run_until_complete(async_coro(queue.async_q))
loop.run_until_complete(fut)
您可以创建一个任务,在循环中等待来自队列的新消息,并根据请求启动新服务器。其他线程可能会将新消息推送到请求新服务器的队列中。