如何在 Python 3 中的多个异步方法之间创建依赖关系?

How can I create dependencies among multiple async methods in Python 3?

我有一个 class,其中包含多个异步方法,我想在它们之间创建依赖关系。 Dependency Graph 在这张图中,heartbeat 和 neighborhood_check 都依赖于 radio 而 sub 依赖于 neighborhood_check,这意味着在启动 radio 之后我想启动 heartbeat 和 neighborhood_check。 在启动 neighborhood_check 之后,我想启动 sub。我还有一个启动方法的 asyinc_start 函数,所以在这里我想管理这些方法,但我不能。

给我一些建议。

check_dependency.py的代码如下:

import asyncio
import sys
import os
from util import run`enter code here`
class ChkDependency():
    async def radio(self):
        print("Radio is initialized")
        await asyncio.sleep(2)


    async def pub(self):
        await asyncio.sleep(2)
        print("pub is initialized")


    async def heartbeat(self):
        print("heartbeat started")
        await asyncio.sleep(2)


    async def neigh_hood_check(self):
        await asyncio.sleep(2)
        print("checking for matches in neigh list")


    async def subs(self):
        await asyncio.sleep(2)
        print("match found")
        await asyncio.sleep(2)
        print("subscribing.....")


if __name__ == '__main__':
    chkdependency = ChkDependency()
    async def start():
        while True:
            await asyncio.wait([
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs(),
            ])    

    try:
        run(
            start(),
          )
    except KeyboardInterrupt:
        print("Exiting...")
        exit()

util.py的代码如下:

import asyncio

def run(*args):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*args))

我正在考虑使用信号量实现依赖关系,但没有积极的结果。 请帮忙!!

根据问题的描述,不清楚你想知道什么,所以我假设。

  1. 如果你只想让方法按照给定的顺序执行,那么 只需这样做:

    try:
        while True:
            await chkdependency.radio()
            await chkdependency.pub()
            await chkdependency.heartbeat()
            await chkdependency.neigh_hood_check()
            await chkdependency.subs()
    

    但在这种情况下,每个后续方法都将等待 完成上一篇!

  2. 如果要保证方法在指定时间启动 顺序,并以任何顺序结束,那么同步原语是 必不可少的:

    import asyncio
    from random import randint
    
    
    class ChkDependency:
        def __init__(self):
            self.radio_started = asyncio.Event()
            self.pub_started = asyncio.Event()
            self.heartbeat_started = asyncio.Event()
            self.neigh_hood_check_started = asyncio.Event()
            self.sub_started = asyncio.Event()
    
        async def radio(self):
            await self.radio_started.wait()
            print("1 Start radio")
            self.pub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("1 Finish radio")
            self.radio_started.clear()
    
        async def pub(self):
            await self.pub_started.wait()
            print("2 Start pub")
            self.heartbeat_started.set()
            await asyncio.sleep(randint(1, 5))
            print("2 Finish pub")
            self.pub_started.clear()
    
        async def heartbeat(self):
            await self.heartbeat_started.wait()
            print("3 Start heartbeat")
            self.neigh_hood_check_started.set()
            await asyncio.sleep(randint(1, 5))
            print("3 Finish heartbeat")
            self.heartbeat_started.clear()
    
        async def neigh_hood_check(self):
            await self.neigh_hood_check_started.wait()
            print("4 Start neigh_hood_check")
            self.sub_started.set()
            await asyncio.sleep(randint(1, 5))
            print("4 Finish neigh_hood_check")
            self.neigh_hood_check_started.clear()
    
        async def subs(self):
            await self.sub_started.wait()
            print("5 Start subs")
            await asyncio.sleep(randint(1, 5))
            print("5 Finish subs")
            self.sub_started.clear()
    
        async def start(self):
            while True:
                self.radio_started.set()
                print("\nStart new cycle:")
                await asyncio.gather(
                    self.radio(),
                    self.pub(),
                    self.heartbeat(),
                    self.neigh_hood_check(),
                    self.subs()
                )
    
    
    async def main():
        chkdependency = ChkDependency()
        await chkdependency.start()
    
    if __name__ == '__main__':
        try:
            asyncio.run(main())
        except KeyboardInterrupt:
            print("Exiting...")
    
  3. 请记住,如果您这样做(或与 asyncio.wait 而不是 asyncio.gather):

    async def start():
        while True:
            await asyncio.gather(
                chkdependency.radio(),
                chkdependency.pub(),
                chkdependency.heartbeat(),
                chkdependency.neigh_hood_check(),
                chkdependency.subs()
            )
    

    在您看来,方法是依次开始的。那是因为 在内部,asyncio.gather 和 asyncio.wait 运行 它们都在一个循环中。 但是,请记住,这种行为是无法保证的!还有这个 意味着在任何时候,例如,在新版本的 库或出于某些内部原因,这些方法可以从 不同的顺序。

  4. 您似乎也不需要 运行 方法,因为 asyncio.run 一样!