Python 队列链接对象 运行 asyncio 协程与主线程输入
Python queue linking object running asyncio coroutines with main thread input
我有一个脚本 运行ning,其中主线程从标准输入获取输入,然后使用队列将其传递给子线程。在子线程中,我使用 asyncio 协同程序在套接字上启动侦听器并等待连接。建立连接后,我现在可以从主线程通过侦听器发送数据。
一切似乎都运行良好,但由于 asyncio.BaseEventLoop 不是线程安全的,我是否会 运行 遇到问题?
这是我尝试解决使用像 python 的带有 asyncio 的 cmd 模块的阻塞库的问题。
我的代码如下。
import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue
stdin_q = Queue()
clients = {} # task -> (reader, writer)
def client_connected_handler(client_reader, client_writer):
# Start a new asyncio.Task to handle this specific client connection
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_reader, client_writer)
def client_done(task):
# When the tasks that handles the specific client connection is done
del clients[task]
# Add the client_done callback to be run when the future becomes done
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
# Handle the requests for a specific client with a line oriented protocol
while True:
cmd = yield from get_input()
client_writer.write(cmd.encode())
data = yield from client_reader.read(1024)
print(data.decode(),end="",flush=True)
@asyncio.coroutine
def get_input():
while True:
try:
return stdin_q.get()
except:
pass
class Control:
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
self.loop.run_forever()
self.stop()
def stop(self):
self.loop.stop()
self.loop.close()
def fire_control():
con = Control()
con.start()
if __name__ == "__main__":
stdin_q.put("\n")
t = Thread(target=fire_control)
t.start()
sleep(2)
_cmd = ""
while _cmd.lower() != "exit":
_cmd = input("")
if _cmd == "":
_cmd = "\r\n"
stdin_q.put(_cmd)
这不会很好地工作,因为对 stdin_q.get()
的调用将阻止您的事件循环。这意味着如果您的服务器有多个客户端,那么所有客户端都将被第一个到达 stdin_q.get()
的客户端完全阻塞,直到您将数据发送到队列中。解决这个问题的最简单方法是在后台 ThreadPoolExecutor
中使用 BaseEvent.loop.run_in_executor
到 运行 stdin_q.get
,这允许您等待它而不会阻塞事件循环:
@asyncio.coroutine
def get_input():
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(None, stdin_q.get)) # None == use default executor.
编辑 (1/27/16):
有一个名为 janus
的库,它提供了一个异步友好、线程安全的队列实现。
使用该库,您的代码将如下所示(我省略了未更改的部分):
...
import janus
loop = asyncio.new_event_loop()
stdin_q = janus.Queue(loop=loop)
...
@asyncio.coroutine
def get_input():
loop = asyncio.get_event_loop()
return (yield from stdin_q.async_q.get())
class Control:
def start(self):
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
self.loop.run_forever()
self.stop()
def stop(self):
self.loop.stop()
self.loop.close()
...
if __name__ == "__main__":
stdin_q.sync_q.put("\n")
t = Thread(target=runner)
t.start()
sleep(2)
_cmd = ""
while _cmd.lower() != "exit":
_cmd = input("")
if _cmd == "":
_cmd = "\r\n"
stdin_q.sync_q.put(_cmd)
我有一个脚本 运行ning,其中主线程从标准输入获取输入,然后使用队列将其传递给子线程。在子线程中,我使用 asyncio 协同程序在套接字上启动侦听器并等待连接。建立连接后,我现在可以从主线程通过侦听器发送数据。
一切似乎都运行良好,但由于 asyncio.BaseEventLoop 不是线程安全的,我是否会 运行 遇到问题?
这是我尝试解决使用像 python 的带有 asyncio 的 cmd 模块的阻塞库的问题。
我的代码如下。
import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue
stdin_q = Queue()
clients = {} # task -> (reader, writer)
def client_connected_handler(client_reader, client_writer):
# Start a new asyncio.Task to handle this specific client connection
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_reader, client_writer)
def client_done(task):
# When the tasks that handles the specific client connection is done
del clients[task]
# Add the client_done callback to be run when the future becomes done
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
# Handle the requests for a specific client with a line oriented protocol
while True:
cmd = yield from get_input()
client_writer.write(cmd.encode())
data = yield from client_reader.read(1024)
print(data.decode(),end="",flush=True)
@asyncio.coroutine
def get_input():
while True:
try:
return stdin_q.get()
except:
pass
class Control:
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
self.loop.run_forever()
self.stop()
def stop(self):
self.loop.stop()
self.loop.close()
def fire_control():
con = Control()
con.start()
if __name__ == "__main__":
stdin_q.put("\n")
t = Thread(target=fire_control)
t.start()
sleep(2)
_cmd = ""
while _cmd.lower() != "exit":
_cmd = input("")
if _cmd == "":
_cmd = "\r\n"
stdin_q.put(_cmd)
这不会很好地工作,因为对 stdin_q.get()
的调用将阻止您的事件循环。这意味着如果您的服务器有多个客户端,那么所有客户端都将被第一个到达 stdin_q.get()
的客户端完全阻塞,直到您将数据发送到队列中。解决这个问题的最简单方法是在后台 ThreadPoolExecutor
中使用 BaseEvent.loop.run_in_executor
到 运行 stdin_q.get
,这允许您等待它而不会阻塞事件循环:
@asyncio.coroutine
def get_input():
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(None, stdin_q.get)) # None == use default executor.
编辑 (1/27/16):
有一个名为 janus
的库,它提供了一个异步友好、线程安全的队列实现。
使用该库,您的代码将如下所示(我省略了未更改的部分):
...
import janus
loop = asyncio.new_event_loop()
stdin_q = janus.Queue(loop=loop)
...
@asyncio.coroutine
def get_input():
loop = asyncio.get_event_loop()
return (yield from stdin_q.async_q.get())
class Control:
def start(self):
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
self.loop.run_forever()
self.stop()
def stop(self):
self.loop.stop()
self.loop.close()
...
if __name__ == "__main__":
stdin_q.sync_q.put("\n")
t = Thread(target=runner)
t.start()
sleep(2)
_cmd = ""
while _cmd.lower() != "exit":
_cmd = input("")
if _cmd == "":
_cmd = "\r\n"
stdin_q.sync_q.put(_cmd)