Prefect 中的循环任务
Looping tasks in Prefect
我想一次又一次地循环执行任务,直到达到某个条件,然后再继续其余的工作流程。
目前我得到的是:
# Loop task
class MyLoop(Task):
def run(self):
loop_res = prefect.context.get("task_loop_result", 1)
print (loop_res)
if loop_res >= 10:
return loop_res
raise LOOP(result=loop_res+1)
但据我了解,这不适用于多个任务。
有没有办法进一步返回并一次循环执行多个任务?
除非我遗漏了什么,否则答案是否定的。
Prefect 流程是 DAGs,而您所描述的(一次又一次地按顺序循环多个任务,直到满足某些条件)会形成一个循环,所以您不能这样做。
这可能有帮助,也可能没有帮助,但您可以尝试将要循环的所有任务合并为一个任务,并在该任务中循环,直到满足退出条件。
解决方案很简单,就是创建一个单独的任务,该任务本身会创建一个带有一个或多个参数的新流并调用 flow.run()。例如:
class MultipleTaskLoop(Task):
def run(self):
# Get previous value
loop_res = prefect.context.get("task_loop_result", 1)
# Create subflow
with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
x = Parameter('x', default = 1)
loop1 = print_loop()
add = add_value(x)
loop2 = print_loop()
loop1.set_downstream(add)
add.set_downstream(loop2)
# Run subflow and extract result
subflow_res = flow.run(parameters={'x': loop_res})
new_res = subflow_res.result[add]._result.value
# Loop
if new_res >= 10:
return new_res
raise LOOP(result=new_res)
其中 print_loop
只是在输出中打印“循环”,add_value
将它接收到的值加一。
我想一次又一次地循环执行任务,直到达到某个条件,然后再继续其余的工作流程。
目前我得到的是:
# Loop task
class MyLoop(Task):
def run(self):
loop_res = prefect.context.get("task_loop_result", 1)
print (loop_res)
if loop_res >= 10:
return loop_res
raise LOOP(result=loop_res+1)
但据我了解,这不适用于多个任务。 有没有办法进一步返回并一次循环执行多个任务?
除非我遗漏了什么,否则答案是否定的。
Prefect 流程是 DAGs,而您所描述的(一次又一次地按顺序循环多个任务,直到满足某些条件)会形成一个循环,所以您不能这样做。
这可能有帮助,也可能没有帮助,但您可以尝试将要循环的所有任务合并为一个任务,并在该任务中循环,直到满足退出条件。
解决方案很简单,就是创建一个单独的任务,该任务本身会创建一个带有一个或多个参数的新流并调用 flow.run()。例如:
class MultipleTaskLoop(Task):
def run(self):
# Get previous value
loop_res = prefect.context.get("task_loop_result", 1)
# Create subflow
with Flow('Subflow', executor=LocalDaskExecutor()) as flow:
x = Parameter('x', default = 1)
loop1 = print_loop()
add = add_value(x)
loop2 = print_loop()
loop1.set_downstream(add)
add.set_downstream(loop2)
# Run subflow and extract result
subflow_res = flow.run(parameters={'x': loop_res})
new_res = subflow_res.result[add]._result.value
# Loop
if new_res >= 10:
return new_res
raise LOOP(result=new_res)
其中 print_loop
只是在输出中打印“循环”,add_value
将它接收到的值加一。