如何在 python 中实现给定两个协程,保持 运行 一个直到另一个完成

How to implement in python given two coroutine, keep run one until the other finished

python asyncio lib 的新手,一直在努力实现保持活动状态的任务。我想同时 运行 一个 cpu 密集型任务和一个保活任务。 keep alive 应该 运行 定期直到 cpu 密集完成。

import asyncio
import time

async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    delay = 3
    close_time = time.time() + delay
    while True:
        if time.time() > close_time:
            break


async def keep_alive():
    print("keep alive for 1 second") # My real use case is I want to send a heart beat message every x seconds until cpu intensive finished
    await asyncio.sleep(1)

async def main():
    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    keep_alive_task = asyncio.create_task(keep_alive())
    print(f"Started at {time.strftime('%X')}")
    # TODO: Not sure how to achieve the expected output
    print(f"Finished at {time.strftime('%X')}")

asyncio.run(main())

'''
Expected Output
Started at 23:55:08
cpu_intensive 3 seconds
keep alive for 1 seconds
keep alive for 1 seconds
keep alive for 1 seconds
Finished at 23:55:11
'''

我浏览了 asyncio 库 python 并尝试了几个 API,例如 awaitrun_coroutine_threadsafeasyncio.gather。但是无法让它工作。

您是否尝试过使用线程? 你可以像这样轻松实现它。

import threading
import time
from datetime import datetime

def cpu_intensive():
    time.sleep(10) #this is the time consuming task
    print("CPU INTENSTIVE TASK DONE")

def keep_alive():
    now = datetime.now() # current date and time
    date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
    print(f"{date_time}  -- BUSY STATUS / SIGNAL CODE")

#Starting the cpu_intensive task
thread = threading.Thread(target=cpu_intensive)
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
print("STARTING CPU PROCESS: ", date_time) 
thread.start()

# Doing something while the task is alive
while thread.is_alive():
    keep_alive()
    time.sleep(1)
thread.join()
print("TASK COMPLETE")

输出应如下所示

STARTING CPU PROCESS:  11/18/2021, 14:53:05
11/18/2021, 14:53:05  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:06  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:07  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:08  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:09  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:10  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:11  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:12  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:13  -- BUSY STATUS / SIGNAL CODE
11/18/2021, 14:53:14  -- BUSY STATUS / SIGNAL CODE
CPU INTENSTIVE TASK DONE
TASK COMPLETE

您也可以切换线程以保持 keep_alive。

我认为您可能混淆了并发和并行的概念。写一些我在玩asyncio时的理解:

实现并发的两种方式

  • Parallel: 'Physical' 并发,在同一点上可能同时执行超过 1 行代码。

    • 图书馆: multiprocessing
    • pro: 可以利用Multiple core
    • con:占用一些资源来创建进程。与进程通信的高开销(使用 pickle 序列化)。工作负载必须是线程安全的。
  • Asynchronous(await/async): 'Perceived' 并发,在任何给定时间执行的代码不能超过 1 行,但通过上下文切换实现并发。使用 await 关键字允许上下文更改。

    • 图书馆: asyncio curio trio
    • pro: 比同步代码更能利用一个内核。很轻巧。控制流比线程更可预测。 (上下文切换仅发生在 await 关键字上。)
    • con: 不能利用Multiple核心。在任何给定时间不能 运行 超过 1 个代码。无法在繁重的工作负载中切换上下文。
  • Asynchronous(time division):又名线程。由于 GIL,python 中的线程在任何给定时间只能执行 1 行代码。因此与上述有相似之处。

    • 图书馆threading
    • pro: 比同步代码更能利用一个内核。 (因为它 运行 等待其他代码。)非常轻量级。由于它使用时间分割方法,因此即使在 CPU 繁重的工作量下也可以 运行。 (通过短暂停止工作负载并执行其他线程)
    • con: 不能利用Multiple核心。在任何给定时间不能 运行 超过 1 个代码。控制流很难预测。

因此,对于任何 CPU 密集型工作负载,最好并行化。

对于任何 IO 绑定工作负载(也称为等待),最好异步编码 - 因为无论如何我们都不需要利用更多内核。

代码修复

异步

您需要 awaitcpu_intensive 协程中做一些事情。

如图所示 我们可以使用 yield asyncio.sleep(0) 在工作负载中添加上下文切换点。当然,这不是编写异步代码的理想方式,但如果您需要将此类功能附加到异步代码,这是一种方式。

import asyncio
import time


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            break

        await asyncio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await asyncio.sleep(1)


async def main():
    print(f"Started at {time.strftime('%X')}")

    cpu_intensive_task = asyncio.create_task(cpu_intensive())
    asyncio.create_task(keep_alive())

    await cpu_intensive_task

    print(f"Finished at {time.strftime('%X')}")


asyncio.run(main())

"""
Started at 04:13:09
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 04:13:12
"""

还有一个保持活动状态,因为它先检查条件然后等待 1 秒。

请注意 asyncio.sleep 实际上并不是在等待准确的给定时间。把它想象成“在我睡觉的时候做任何你想做的事,但一定要在 X 秒后给我打电话。”


P.S.

稍后,您会像我一样意识到 instability, hard error handling or inconsistency of asyncio and stumble upon to trio,对于这种情况,我将留下三重奏的示例。

三重奏

import trio
import time


class TaskDoneException(Exception):
    pass


async def cpu_intensive():
    print("cpu_intensive for 3 seconds")
    duration = 3
    close_time = time.time() + duration
    while True:
        if time.time() > close_time:
            raise TaskDoneException()

        await trio.sleep(0)


async def keep_alive():
    while True:
        print("keep alive for 1 second")
        await trio.sleep(1)


async def main():
    try:
        async with trio.open_nursery() as nursery:

            print(f"Started at {time.strftime('%X')}")
            nursery.start_soon(cpu_intensive)
            nursery.start_soon(keep_alive)
    except TaskDoneException:
        print(f"Finished at {time.strftime('%X')}")


trio.run(main)


'''
Output:
Started at 17:43:45
keep alive for 1 second
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 17:43:48
'''