Prefect:如何根据从参数派生的任务列表强制创建任务
Prefect: How to imperatively create tasks based on a task list derived from a Parameter
我正在尝试根据列表强制定义任务。挑战在于列表应基于 Prefect 参数。
下面是我试过的代码,但显然它不起作用,因为 task_dependency_pairs 是一项任务,而不是列表。
如何在不破坏参数任务与其他动态生成的任务之间的依赖性的情况下使其工作?
from prefect import task, Flow, Parameter, Task
import time
@task
def task_dependency_pairs(param):
return [
('task 1', f'{param}A', ''),
('task 2', f'{param}B','task 1'),
('task 3', f'{param}C','task 1')]
class Task_class(Task):
def run(self, **kwarg):
time.sleep(5)
print(f"This task {kwarg['task_name']} does a lot of things with {kwarg.get('calc_value','')}.")
for task_name, dependency in task_dependency_pairs:
globals()[task_name] = type(task_name, (Task_class,),{"__module__": __name__})
with Flow("my_process") as flow:
param = Parameter("param", default="default_param")
task_dependency_pairs_list = task_dependency_pairs(param)
for task_name, calc_value, dependency in task_dependency_pairs_list: # This won't work
task_instance = globals()[task_name](name=task_name)
flow.add_task(task_instance(task_name = task_name, calc_value = calc_value))
for task_name, calc_value, dependency in task_dependency_pairs_list: # This won't work
if len(dependency) >0:
flow.get_tasks(name=task_name)[0].set_upstream(flow.get_tasks(name=dependency)[0])
flow.visualize()
尝试在完美流程中动态创建任务是最好的管理方式via the mapping functionality.
但是,映射只会在流程 运行 期间从可迭代生成任务。它不会随意调整生成任务的依赖关系;它们都共享在主要 mapped
任务中定义的依赖关系。
但是,如果您想在 运行 时间生成流程(具有编程依赖性),我能想到的唯一方法是创建一个创建流程的任务并立即 运行就这样了。
这可能会查找您的流程的方式是:
...
@task
def run_flow(inputs):
with Flow("subflow") as sub_flow:
for (name, calc_value, dependency) in inputs:
inst = Task_class(name=name)(task_name=name, calc_value=calc_value)
sub_flow.add_task(inst)
if dependency:
inst.set_upstream(sub_flow.get_tasks(name=dependency)[0])
sub_flow.run()
with Flow("my_process") as flow:
param = Parameter("param", default="default_param")
task_dependency_pairs_list = task_dependency_pairs(param)
run_flow(task_dependency_pairs_list)
我正在尝试根据列表强制定义任务。挑战在于列表应基于 Prefect 参数。
下面是我试过的代码,但显然它不起作用,因为 task_dependency_pairs 是一项任务,而不是列表。
如何在不破坏参数任务与其他动态生成的任务之间的依赖性的情况下使其工作?
from prefect import task, Flow, Parameter, Task
import time
@task
def task_dependency_pairs(param):
return [
('task 1', f'{param}A', ''),
('task 2', f'{param}B','task 1'),
('task 3', f'{param}C','task 1')]
class Task_class(Task):
def run(self, **kwarg):
time.sleep(5)
print(f"This task {kwarg['task_name']} does a lot of things with {kwarg.get('calc_value','')}.")
for task_name, dependency in task_dependency_pairs:
globals()[task_name] = type(task_name, (Task_class,),{"__module__": __name__})
with Flow("my_process") as flow:
param = Parameter("param", default="default_param")
task_dependency_pairs_list = task_dependency_pairs(param)
for task_name, calc_value, dependency in task_dependency_pairs_list: # This won't work
task_instance = globals()[task_name](name=task_name)
flow.add_task(task_instance(task_name = task_name, calc_value = calc_value))
for task_name, calc_value, dependency in task_dependency_pairs_list: # This won't work
if len(dependency) >0:
flow.get_tasks(name=task_name)[0].set_upstream(flow.get_tasks(name=dependency)[0])
flow.visualize()
尝试在完美流程中动态创建任务是最好的管理方式via the mapping functionality.
但是,映射只会在流程 运行 期间从可迭代生成任务。它不会随意调整生成任务的依赖关系;它们都共享在主要 mapped
任务中定义的依赖关系。
但是,如果您想在 运行 时间生成流程(具有编程依赖性),我能想到的唯一方法是创建一个创建流程的任务并立即 运行就这样了。
这可能会查找您的流程的方式是:
...
@task
def run_flow(inputs):
with Flow("subflow") as sub_flow:
for (name, calc_value, dependency) in inputs:
inst = Task_class(name=name)(task_name=name, calc_value=calc_value)
sub_flow.add_task(inst)
if dependency:
inst.set_upstream(sub_flow.get_tasks(name=dependency)[0])
sub_flow.run()
with Flow("my_process") as flow:
param = Parameter("param", default="default_param")
task_dependency_pairs_list = task_dependency_pairs(param)
run_flow(task_dependency_pairs_list)