如何使用 asyncio 管理动态任务列表
How to manage a dynamic list of tasks with asyncio
我有一个主要的 class,它从插件列表中创建一个实例。在下面的示例中,启动了 'dhcpv6' 插件并侦听“subject_test_subscribe”主题上的订阅。一旦处理程序拦截到有关该主题的新消息,它就会创建一个新的异步任务实例化 classe “Dhcpv6_child().运行()”,它表示 class Dhcpv6.py 的子项。每个子项都有一个超时来终止进程子进程。有关信息,init.py 中定义的 class 插件是一个抽象 class,它允许加载插件。
我无法使用 asyncio.gather 动态管理新任务的添加(直到当前任务完成,新任务才被执行)。使用 asyncio.create_subprocess_exec 解决了问题,但我更愿意使用 asyncio
的可能性
main.py
import argparse
import asyncio
import importlib
parser = argparse.ArgumentParser()
parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS")
parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser")
args = parser.parse_args()
print("************** main() *****************")
print(f"NATS server IP: {args.nats}")
print(f"List of plugins to load: {args.plugins}")
async def main():
tasks = []
for type_plugin in args.plugins:
try:
module = importlib.import_module(f'Plugins.{type_plugin}')
my_class = getattr(module, type_plugin)
my_instance = my_class(nats=args.nats)
tasks.append(asyncio.create_task(my_instance.run()))
except Exception as e:
print("Erreur chargement du plugin : ", type_plugin, ":", e)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
print("timeout main")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.create_task(main())
loop.run_forever()
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
finally:
loop.close()
Dhcpv6.py
import asyncio
import json
import logging
import sys
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from Plugins import Plugin
from Plugins.Dhcpv6_child import Dhcpv6_child
logger = logging.getLogger("dhcpv6")
logging.basicConfig(level=logging.DEBUG)
class Dhcpv6(Plugin):
name = "Dhcpv6 plugin"
subject = "subject_test_subscribe"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.nats = kwargs.get('nats')
self.list_task = []
self.timeout_task = 5
print("Class dhcpv6 ==> constructor / nats_ip : {}".format(self.nats))
async def run(self):
print("******* run DHCPV6 ***********")
nc = NATS()
try:
await nc.connect("127.0.0.1", verbose=True, pedantic=True)
except ErrConnectionClosed:
print(f"La connexion a ete fermee inopinement")
return
except ErrTimeout:
print(f"Le delai imparti a la connexion est depasse")
return
except ErrNoServers:
print(f"Aucun serveur n'a repondu a temps")
return
except Exception as e:
print(f"Exception inattendue: {e}")
return
async def plugin_handler(msg):
print(msg)
try:
# Creating and running a new task on demand
self.list_task.append(asyncio.wait_for(
asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task))
print("append a new task : ", self.list_task)
except:
print("Error append new task")
try:
print("Running a new task : ", self.list_task)
await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0)
except asyncio.TimeoutError:
print("Fin du main : timeout atteint")
print(f"Subscribing test on : {self.subject}")
await nc.subscribe(f"{self.subject}", cb=plugin_handler)
while nc.is_connected:
await asyncio.sleep(0.5)
await nc.drain()
这是一个输出屏幕:
************** main() *****************
NATS server IP: 127.0.0.1
List of plugins to load: : ['Dhcpv6']
------------ Load a list of plugins : {Dhcpv6} -------
Class plugin ==> constructor
Class dhcpv6 ==> constructor / nats_ip : 127.0.0.1
******* run DHCPV6 ***********
INFO:Plugin:Plugin Dhcpv6 plugin loaded
Subscribing test on : subject_test_subscribe
<Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'>
Class CHILD ==> constructor
append a new task : [<coroutine object wait_for at 0x0000026D0310F040>]
Runnig a new task : [<coroutine object wait_for at 0x0000026D0310F040>] ```
好的,Cornelius,感谢您的建议。没错,asyncio.create_task() 允许按需创建任务。如果我想一个接一个地处理任务,我们需要添加 asyncio.wait(self.list_task) 但这不是我的目的。下面是我如何在“Dhcpv6.py”中创建一个新的 child。有关信息,由 asyncio.create_task() 调用的 main() 例程是从 child 脚本“Dhcpv6_child.py”导入的:
if not mac in self.list_mac:
self.list_task.append(asyncio.create_task(main(self.nats,
self.min_inventory['name'],
self.min_inventory,
str(datetime.now()),
str(datetime.now() + self.timeout))))
self.date_start.append(datetime.now())
self.date_end.append(datetime.now() + self.timeout)
self.lb_name.append(self.min_inventory['name'])
self.list_mac.append(mac)
# logger.info(f"{self.lb_name} {task.get_name()} created")
await nc.subscribe(f"call.dhcpv6.{self.min_inventory['name']}", cb=cancel_tasks)
else :
logger.warning(f"[{__name__}] Task adress %s already in list_mac", mac)
最后,由于我在正确管理任务列表方面遇到问题,我为每个 child 关联了一个时间计数器(如上文主例程中所述)。任务完成后,child 会向 parent 发送一条消息,更新当前任务列表。
我有一个主要的 class,它从插件列表中创建一个实例。在下面的示例中,启动了 'dhcpv6' 插件并侦听“subject_test_subscribe”主题上的订阅。一旦处理程序拦截到有关该主题的新消息,它就会创建一个新的异步任务实例化 classe “Dhcpv6_child().运行()”,它表示 class Dhcpv6.py 的子项。每个子项都有一个超时来终止进程子进程。有关信息,init.py 中定义的 class 插件是一个抽象 class,它允许加载插件。
我无法使用 asyncio.gather 动态管理新任务的添加(直到当前任务完成,新任务才被执行)。使用 asyncio.create_subprocess_exec 解决了问题,但我更愿意使用 asyncio
的可能性main.py
import argparse
import asyncio
import importlib
parser = argparse.ArgumentParser()
parser.add_argument('--nats', '-n', nargs="?", type=str, required=True, help="adresse IP du serveur NATS")
parser.add_argument('--plugins', '-p', nargs="+", help="Liste de plugins a utiliser")
args = parser.parse_args()
print("************** main() *****************")
print(f"NATS server IP: {args.nats}")
print(f"List of plugins to load: {args.plugins}")
async def main():
tasks = []
for type_plugin in args.plugins:
try:
module = importlib.import_module(f'Plugins.{type_plugin}')
my_class = getattr(module, type_plugin)
my_instance = my_class(nats=args.nats)
tasks.append(asyncio.create_task(my_instance.run()))
except Exception as e:
print("Erreur chargement du plugin : ", type_plugin, ":", e)
try:
await asyncio.gather(*tasks)
except asyncio.TimeoutError:
print("timeout main")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.create_task(main())
loop.run_forever()
except KeyboardInterrupt:
pass
except Exception as e:
print(e)
finally:
loop.close()
Dhcpv6.py
import asyncio
import json
import logging
import sys
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from Plugins import Plugin
from Plugins.Dhcpv6_child import Dhcpv6_child
logger = logging.getLogger("dhcpv6")
logging.basicConfig(level=logging.DEBUG)
class Dhcpv6(Plugin):
name = "Dhcpv6 plugin"
subject = "subject_test_subscribe"
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.nats = kwargs.get('nats')
self.list_task = []
self.timeout_task = 5
print("Class dhcpv6 ==> constructor / nats_ip : {}".format(self.nats))
async def run(self):
print("******* run DHCPV6 ***********")
nc = NATS()
try:
await nc.connect("127.0.0.1", verbose=True, pedantic=True)
except ErrConnectionClosed:
print(f"La connexion a ete fermee inopinement")
return
except ErrTimeout:
print(f"Le delai imparti a la connexion est depasse")
return
except ErrNoServers:
print(f"Aucun serveur n'a repondu a temps")
return
except Exception as e:
print(f"Exception inattendue: {e}")
return
async def plugin_handler(msg):
print(msg)
try:
# Creating and running a new task on demand
self.list_task.append(asyncio.wait_for(
asyncio.create_task(Dhcpv6_child().run()), timeout=self.timeout_task))
print("append a new task : ", self.list_task)
except:
print("Error append new task")
try:
print("Running a new task : ", self.list_task)
await asyncio.wait_for(asyncio.gather(*self.list_task), timeout=3600.0)
except asyncio.TimeoutError:
print("Fin du main : timeout atteint")
print(f"Subscribing test on : {self.subject}")
await nc.subscribe(f"{self.subject}", cb=plugin_handler)
while nc.is_connected:
await asyncio.sleep(0.5)
await nc.drain()
这是一个输出屏幕:
************** main() *****************
NATS server IP: 127.0.0.1
List of plugins to load: : ['Dhcpv6']
------------ Load a list of plugins : {Dhcpv6} -------
Class plugin ==> constructor
Class dhcpv6 ==> constructor / nats_ip : 127.0.0.1
******* run DHCPV6 ***********
INFO:Plugin:Plugin Dhcpv6 plugin loaded
Subscribing test on : subject_test_subscribe
<Msg: subject='subject_test_subscribe' reply='' data='{"query": ...'>
Class CHILD ==> constructor
append a new task : [<coroutine object wait_for at 0x0000026D0310F040>]
Runnig a new task : [<coroutine object wait_for at 0x0000026D0310F040>] ```
好的,Cornelius,感谢您的建议。没错,asyncio.create_task() 允许按需创建任务。如果我想一个接一个地处理任务,我们需要添加 asyncio.wait(self.list_task) 但这不是我的目的。下面是我如何在“Dhcpv6.py”中创建一个新的 child。有关信息,由 asyncio.create_task() 调用的 main() 例程是从 child 脚本“Dhcpv6_child.py”导入的:
if not mac in self.list_mac:
self.list_task.append(asyncio.create_task(main(self.nats,
self.min_inventory['name'],
self.min_inventory,
str(datetime.now()),
str(datetime.now() + self.timeout))))
self.date_start.append(datetime.now())
self.date_end.append(datetime.now() + self.timeout)
self.lb_name.append(self.min_inventory['name'])
self.list_mac.append(mac)
# logger.info(f"{self.lb_name} {task.get_name()} created")
await nc.subscribe(f"call.dhcpv6.{self.min_inventory['name']}", cb=cancel_tasks)
else :
logger.warning(f"[{__name__}] Task adress %s already in list_mac", mac)
最后,由于我在正确管理任务列表方面遇到问题,我为每个 child 关联了一个时间计数器(如上文主例程中所述)。任务完成后,child 会向 parent 发送一条消息,更新当前任务列表。