如何在失败时恢复 Prefect 流程而无需重新 运行 整个流程?
How to resume a Prefect flow on failure without having to re-run the entire flow?
TL;DR;
我无法使用 prefect 的 FlowRunner 来解决上述问题。我可能用错了(见下文)或遗漏了一些东西。非常感谢任何指点!
问题
我通读了很棒的 prefect 核心文档 并找到了有关 处理失败 和 本地调试 [= 的部分45=] 与此最相关(可能遗漏了什么!)。 FlowRunner class 似乎(对我而言)是解决方案。
看看我是否可以使用 Flow Runner 恢复失败的流程:
- 创建了一个失败的流程运行:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
- 运行 它在 iPython 并保存状态:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
将 failure()
中的 1 / 0 替换为 1 / 1 因此任务会成功:
最后将之前的状态传递给flow_runner
希望它能恢复流:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
整个流程 运行 再次包括 3 秒的成功任务。
这里的问题是您正在用每个 运行 重建流程,这会更改任务对象。 state.result
是一个字典,其键是 Task 对象 - 如果底层 Task 对象以任何方式发生变化,它的散列也会发生变化。您应该改为使用更新的 Task 对象手动创建状态字典,如下所示:
from prefect.engine.state import Success
failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}
TL;DR;
我无法使用 prefect 的 FlowRunner 来解决上述问题。我可能用错了(见下文)或遗漏了一些东西。非常感谢任何指点!
问题
我通读了很棒的 prefect 核心文档 并找到了有关 处理失败 和 本地调试 [= 的部分45=] 与此最相关(可能遗漏了什么!)。 FlowRunner class 似乎(对我而言)是解决方案。
看看我是否可以使用 Flow Runner 恢复失败的流程:
- 创建了一个失败的流程运行:
from time import sleep
import prefect
from prefect import Flow, task
@task
def success():
sleep(3)
return
@task
def failure():
return 1 / 0
def get_flow_runner():
with Flow("Success/Failure") as flow:
success()
failure()
return prefect.engine.FlowRunner(flow)
- 运行 它在 iPython 并保存状态:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: state = flow_runner.run()
将
failure()
中的 1 / 0 替换为 1 / 1 因此任务会成功:最后将之前的状态传递给
flow_runner
希望它能恢复流:
In [1]: run nameofscript.py
In [2]: flow_runner = get_flow_runner()
In [3]: flow_runner.run(task_states=state.result)
整个流程 运行 再次包括 3 秒的成功任务。
这里的问题是您正在用每个 运行 重建流程,这会更改任务对象。 state.result
是一个字典,其键是 Task 对象 - 如果底层 Task 对象以任何方式发生变化,它的散列也会发生变化。您应该改为使用更新的 Task 对象手动创建状态字典,如下所示:
from prefect.engine.state import Success
failure_task = runner.flow.get_tasks(name="failure")[0]
task_states = {failure_task: Success("Mocked success")}