如何使用 asyncio 管理动态任务列表

How to manage a dynamic list of tasks with asyncio

我有一个主要的 class,它从插件列表中创建一个实例。在下面的示例中,启动了 'dhcpv6' 插件并侦听“subject_test_subscribe”主题上的订阅。一旦处理程序拦截到有关该主题的新消息,它就会创建一个新的异步任务实例化 classe “Dhcpv6_child().运行()”,它表示 class Dhcpv6.py 的子项。每个子项都有一个超时来终止进程子进程。有关信息,init.py 中定义的 class 插件是一个抽象 class,它允许加载插件。

我无法使用 asyncio.gather 动态管理新任务的添加(直到当前任务完成,新任务才被执行)。使用 asyncio.create_subprocess_exec 解决了问题,但我更愿意使用 asyncio

的可能性

main.py

import argparse
import asyncio
import importlib

parser = argparse.ArgumentParser()
parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS")
parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser")
args = parser.parse_args()

print("************** main() *****************")
print(f"NATS server IP: {args.nats}")
print(f"List of plugins to load: {args.plugins}")

async def main():
    tasks = []
    for type_plugin in args.plugins:
        try:
            module = importlib.import_module(f'Plugins.{type_plugin}')
            my_class = getattr(module, type_plugin)
            my_instance = my_class(nats=args.nats)
            tasks.append(asyncio.create_task(my_instance.run()))
        except Exception as e:
            print("Erreur chargement du plugin : ", type_plugin, ":",  e)

    try:
        await asyncio.gather(*tasks)
    except asyncio.TimeoutError:
        print("timeout main")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.create_task(main())
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    except Exception as e:
        print(e)
    finally:
        loop.close()


Dhcpv6.py

import asyncio
import json
import logging
import sys

from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from Plugins import Plugin
from Plugins.Dhcpv6_child import Dhcpv6_child

logger = logging.getLogger("dhcpv6")
logging.basicConfig(level=logging.DEBUG)


class Dhcpv6(Plugin):
    name = "Dhcpv6 plugin"
    subject = "subject_test_subscribe"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.nats = kwargs.get('nats')
        self.list_task = []
        self.timeout_task = 5
        print("Class dhcpv6 ==> constructor /  nats_ip : {}".format(self.nats))


    async def run(self):
        print("******* run DHCPV6 ***********")
        nc = NATS()
        try:
            await nc.connect("127.0.0.1", verbose=True, pedantic=True)
        except ErrConnectionClosed:
            print(f"La connexion a ete fermee inopinement")
            return
        except ErrTimeout:
            print(f"Le delai imparti a la connexion est depasse")
            return
        except ErrNoServers:
            print(f"Aucun serveur n'a repondu a temps")
            return
        except Exception as e:
            print(f"Exception inattendue: {e}")
            return

        async def plugin_handler(msg):
            print(msg)
            try:
                # Creating and running a new task on demand
                self.list_task.append(asyncio.wait_for(
                    asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task))
                print("append a new task : ", self.list_task)
            except:
                print("Error append new task")

            try:
                print("Running a new task : ", self.list_task)
                await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0)
            except asyncio.TimeoutError:
                print("Fin du main : timeout atteint")


        print(f"Subscribing test on : {self.subject}")
        await nc.subscribe(f"{self.subject}", cb=plugin_handler)

        while nc.is_connected:
            await asyncio.sleep(0.5)
        await nc.drain()

这是一个输出屏幕:

************** main() *****************
NATS server IP: 127.0.0.1
List of plugins to load: : ['Dhcpv6']


------------ Load a list of plugins : {Dhcpv6} -------
Class plugin ==> constructor
Class dhcpv6 ==> constructor /  nats_ip : 127.0.0.1

******* run DHCPV6 ***********
INFO:Plugin:Plugin Dhcpv6 plugin loaded
Subscribing test on : subject_test_subscribe
<Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'>

Class CHILD ==> constructor
append a new task :  [<coroutine object wait_for at 0x0000026D0310F040>]
Runnig a new task :  [<coroutine object wait_for at 0x0000026D0310F040>] ```

好的,Cornelius,感谢您的建议。没错,asyncio.create_task() 允许按需创建任务。如果我想一个接一个地处理任务,我们需要添加 asyncio.wait(self.list_task) 但这不是我的目的。下面是我如何在“Dhcpv6.py”中创建一个新的 child。有关信息,由 asyncio.create_task() 调用的 main() 例程是从 child 脚本“Dhcpv6_child.py”导入的:

        if not mac in self.list_mac:
            self.list_task.append(asyncio.create_task(main(self.nats,
                                            self.min_inventory['name'],
                                            self.min_inventory,
                                            str(datetime.now()),
                                            str(datetime.now() + self.timeout))))
            self.date_start.append(datetime.now())
            self.date_end.append(datetime.now() + self.timeout)
            self.lb_name.append(self.min_inventory['name'])
            self.list_mac.append(mac)
            # logger.info(f"{self.lb_name} {task.get_name()} created")
            await nc.subscribe(f"call.dhcpv6.{self.min_inventory['name']}", cb=cancel_tasks)
        else :
            logger.warning(f"[{__name__}] Task adress %s already in list_mac", mac)

最后,由于我在正确管理任务列表方面遇到问题,我为每个 child 关联了一个时间计数器(如上文主例程中所述)。任务完成后,child 会向 parent 发送一条消息,更新当前任务列表。