异步循环:如何在现有 python 程序中实现异步 - 并共享 variables/data?
asyncio loops: how to implement asynio in an existing python program - and share variables/data?
我的应用程序需要通过 SSH 进行远程控制。
我想用这个例子:https://asyncssh.readthedocs.io/en/latest/#simple-server-with-input
原来的应用程序比较大,使用GPIO和600行代码,10个库。所以我在这里做了一个简单的例子:
import asyncio, asyncssh, sys, time
# here would be 10 libraries in the original 600line application
is_open = True
return_value = 0;
async def handle_client(process):
process.stdout.write('Enter numbers one per line, or EOF when done:\n')
process.stdout.write(is_open)
total = 0
try:
async for line in process.stdin:
line = line.rstrip('\n')
if line:
try:
total += int(line)
except ValueError:
process.stderr.write('Invalid number: %s\n' % line)
except asyncssh.BreakReceived:
pass
process.stdout.write('Total = %s\n' % total)
process.exit(0)
async def start_server():
await asyncssh.listen('', 8022, server_host_keys=['key'],
authorized_client_keys='key.pub',
process_factory=handle_client)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
loop.run_forever()
# here is the "old" program: that would not run now as loop.run_forever() runs.
#while True:
# print(return_value)
# time.sleep(0.1)
主应用程序主要由 while True 循环驱动,具有很多功能和休眠功能。
我已经在上面的简单示例中评论了该部分。
我的问题是:我应该如何实现使用 loop.run_forever()
的 SSH 部分 - 并且仍然能够 运行 我的主循环?
另外: handle_client(process) - 必须能够与主程序中的变量交互。 (read/write)
你基本上有三个选择:
重写你的主循环以兼容 asyncio
具有大量睡眠的 while True
主循环 正是 您想要异步编写的代码类型。转换为:
while True:
task_1() # takes n ms
sleep(0.2)
task_2() # takes n ms
sleep(0.4)
进入这个:
async def task_1():
while True:
stuff()
await asyncio.sleep(0.6)
async def task_2():
while True:
stuff()
await asyncio.sleep(0.01)
other_stuff()
await asyncio.sleep(0.8)
loop = asyncio.get_event_loop()
loop.add_task(task_1())
loop.add_task(task_2())
...
loop.run_forever()
这是最多的工作,但几乎可以肯定,如果将协程编写成一堆,你现在的代码会写得更好,更清晰,更容易维护,更容易开发。如果这样做,问题就会消失:通过协作式多任务处理,您可以告诉代码何时让步,因此共享状态通常非常容易。通过在获取和使用状态变量之间不等待任何事情,您可以防止竞争条件:不需要任何类型的线程安全变量。
运行 线程中的异步循环
保持当前循环不变,但 运行 在线程(或进程)中使用 threading
或 multiprocessing
的 ascynio 循环。公开某种线程安全变量以允许后台线程更改状态,或转换到(线程安全)消息传递范例,其中 ssh 线程将消息发送到队列中,您的主循环会在自己的时间处理该队列(消息可能类似于 ("a", 5)
可以通过对队列中的所有内容执行类似 state_dict[msg[0]] == msg[1]
的操作来处理。
如果您想这样做,请查看 multiprocessing
and/or threading
文档以了解在线程之间传递变量或消息的正确方法示例。请注意,此版本的性能可能不如纯 asyncio 解决方案,尤其是如果您的代码大部分时间都在主循环中休眠。
运行 你的同步代码在一个线程中,并且在前台有 asyncio
正如@MisterMiyagi 指出的那样,asyncio 有 loop.run_in_executor()
用于启动进程以 运行 阻塞代码。它通常用于 运行 代码的奇数阻塞位而不占用整个循环,但您可以 运行 整个主循环。对某种线程安全变量或消息共享的同样关注也适用。这具有将 asyncio 保持在预期位置的优势(正如@MisterMiyagi 指出的那样)。我有几个项目在一般非异步代码中使用后台异步线程(事件驱动的 gui 代码,异步线程通过 usb 与自定义硬件交互)。可以写,但是写的时候一定要注意。
顺便说一句,如果您决定使用多线程,消息传递(使用队列)通常比直接共享变量更容易。
我的应用程序需要通过 SSH 进行远程控制。 我想用这个例子:https://asyncssh.readthedocs.io/en/latest/#simple-server-with-input
原来的应用程序比较大,使用GPIO和600行代码,10个库。所以我在这里做了一个简单的例子:
import asyncio, asyncssh, sys, time
# here would be 10 libraries in the original 600line application
is_open = True
return_value = 0;
async def handle_client(process):
process.stdout.write('Enter numbers one per line, or EOF when done:\n')
process.stdout.write(is_open)
total = 0
try:
async for line in process.stdin:
line = line.rstrip('\n')
if line:
try:
total += int(line)
except ValueError:
process.stderr.write('Invalid number: %s\n' % line)
except asyncssh.BreakReceived:
pass
process.stdout.write('Total = %s\n' % total)
process.exit(0)
async def start_server():
await asyncssh.listen('', 8022, server_host_keys=['key'],
authorized_client_keys='key.pub',
process_factory=handle_client)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
sys.exit('Error starting server: ' + str(exc))
loop.run_forever()
# here is the "old" program: that would not run now as loop.run_forever() runs.
#while True:
# print(return_value)
# time.sleep(0.1)
主应用程序主要由 while True 循环驱动,具有很多功能和休眠功能。 我已经在上面的简单示例中评论了该部分。
我的问题是:我应该如何实现使用 loop.run_forever()
的 SSH 部分 - 并且仍然能够 运行 我的主循环?
另外: handle_client(process) - 必须能够与主程序中的变量交互。 (read/write)
你基本上有三个选择:
重写你的主循环以兼容 asyncio
具有大量睡眠的 while True
主循环 正是 您想要异步编写的代码类型。转换为:
while True:
task_1() # takes n ms
sleep(0.2)
task_2() # takes n ms
sleep(0.4)
进入这个:
async def task_1():
while True:
stuff()
await asyncio.sleep(0.6)
async def task_2():
while True:
stuff()
await asyncio.sleep(0.01)
other_stuff()
await asyncio.sleep(0.8)
loop = asyncio.get_event_loop()
loop.add_task(task_1())
loop.add_task(task_2())
...
loop.run_forever()
这是最多的工作,但几乎可以肯定,如果将协程编写成一堆,你现在的代码会写得更好,更清晰,更容易维护,更容易开发。如果这样做,问题就会消失:通过协作式多任务处理,您可以告诉代码何时让步,因此共享状态通常非常容易。通过在获取和使用状态变量之间不等待任何事情,您可以防止竞争条件:不需要任何类型的线程安全变量。
运行 线程中的异步循环
保持当前循环不变,但 运行 在线程(或进程)中使用 threading
或 multiprocessing
的 ascynio 循环。公开某种线程安全变量以允许后台线程更改状态,或转换到(线程安全)消息传递范例,其中 ssh 线程将消息发送到队列中,您的主循环会在自己的时间处理该队列(消息可能类似于 ("a", 5)
可以通过对队列中的所有内容执行类似 state_dict[msg[0]] == msg[1]
的操作来处理。
如果您想这样做,请查看 multiprocessing
and/or threading
文档以了解在线程之间传递变量或消息的正确方法示例。请注意,此版本的性能可能不如纯 asyncio 解决方案,尤其是如果您的代码大部分时间都在主循环中休眠。
运行 你的同步代码在一个线程中,并且在前台有 asyncio
正如@MisterMiyagi 指出的那样,asyncio 有 loop.run_in_executor()
用于启动进程以 运行 阻塞代码。它通常用于 运行 代码的奇数阻塞位而不占用整个循环,但您可以 运行 整个主循环。对某种线程安全变量或消息共享的同样关注也适用。这具有将 asyncio 保持在预期位置的优势(正如@MisterMiyagi 指出的那样)。我有几个项目在一般非异步代码中使用后台异步线程(事件驱动的 gui 代码,异步线程通过 usb 与自定义硬件交互)。可以写,但是写的时候一定要注意。
顺便说一句,如果您决定使用多线程,消息传递(使用队列)通常比直接共享变量更容易。