异步循环:如何在现有 python 程序中实现异步 - 并共享 variables/data?

asyncio loops: how to implement asynio in an existing python program - and share variables/data?

我的应用程序需要通过 SSH 进行远程控制。 我想用这个例子:https://asyncssh.readthedocs.io/en/latest/#simple-server-with-input

原来的应用程序比较大,使用GPIO和600行代码,10个库。所以我在这里做了一个简单的例子:

import asyncio, asyncssh, sys, time
# here would be 10 libraries in the original 600line application
is_open = True
return_value = 0;

async def handle_client(process):
    process.stdout.write('Enter numbers one per line, or EOF when done:\n')
    process.stdout.write(is_open)

    total = 0

    try:
        async for line in process.stdin:
            line = line.rstrip('\n')
            if line:
                try:
                    total += int(line)
                except ValueError:
                    process.stderr.write('Invalid number: %s\n' % line)
    except asyncssh.BreakReceived:
        pass

    process.stdout.write('Total = %s\n' % total)
    process.exit(0)

async def start_server():
    await asyncssh.listen('', 8022, server_host_keys=['key'],
                          authorized_client_keys='key.pub',
                          process_factory=handle_client)

loop = asyncio.get_event_loop()

try:
    loop.run_until_complete(start_server())
except (OSError, asyncssh.Error) as exc:
    sys.exit('Error starting server: ' + str(exc))

loop.run_forever()


# here is the "old" program: that would not run now as loop.run_forever() runs.
#while True:
# print(return_value)
# time.sleep(0.1)

主应用程序主要由 while True 循环驱动,具有很多功能和休眠功能。 我已经在上面的简单示例中评论了该部分。

我的问题是:我应该如何实现使用 loop.run_forever() 的 SSH 部分 - 并且仍然能够 运行 我的主循环? 另外: handle_client(process) - 必须能够与主程序中的变量交互。 (read/write)

你基本上有三个选择:

重写你的主循环以兼容 asyncio

具有大量睡眠的 while True 主循环 正是 您想要异步编写的代码类型。转换为:

while True:
     task_1() # takes n ms
     sleep(0.2)
     task_2() # takes n ms
     sleep(0.4)

进入这个:

async def task_1():
    while True:
        stuff()
        await asyncio.sleep(0.6)

async def task_2():
    while True:
        stuff()
        await asyncio.sleep(0.01)
        other_stuff()
        await asyncio.sleep(0.8)

loop = asyncio.get_event_loop()
loop.add_task(task_1())
loop.add_task(task_2())
...
loop.run_forever()

这是最多的工作,但几乎可以肯定,如果将协程编写成一堆,你现在的代码会写得更好,更清晰,更容易维护,更容易开发。如果这样做,问题就会消失:通过协作式多任务处理,您可以告诉代码何时让步,因此共享状态通常非常容易。通过在获取和使用状态变量之间不等待任何事情,您可以防止竞争条件:不需要任何类型的线程安全变量。

运行 线程中的异步循环

保持当前循环不变,但 运行 在线程(或进程)中使用 threadingmultiprocessing 的 ascynio 循环。公开某种线程安全变量以允许后台线程更改状态,或转换到(线程安全)消息传递范例,其中 ssh 线程将消息发送到队列中,您的主循环会在自己的时间处理该队列(消息可能类似于 ("a", 5) 可以通过对队列中的所有内容执行类似 state_dict[msg[0]] == msg[1] 的操作来处理。

如果您想这样做,请查看 multiprocessing and/or threading 文档以了解在线程之间传递变量或消息的正确方法示例。请注意,此版本的性能可能不如纯 asyncio 解决方案,尤其是如果您的代码大部分时间都在主循环中休眠。

运行 你的同步代码在一个线程中,并且在前台有 asyncio

正如@MisterMiyagi 指出的那样,asyncio 有 loop.run_in_executor() 用于启动进程以 运行 阻塞代码。它通常用于 运行 代码的奇数阻塞位而不占用整个循环,但您可以 运行 整个主循环。对某种线程安全变量或消息共享的同样关注也适用。这具有将 asyncio 保持在预期位置的优势(正如@MisterMiyagi 指出的那样)。我有几个项目在一般非异步代码中使用后台异步线程(事件驱动的 gui 代码,异步线程通过 usb 与自定义硬件交互)。可以写,但是写的时候一定要注意。

顺便说一句,如果您决定使用多线程,消息传递(使用队列)通常比直接共享变量更容易。