如果 `prefect` 任务失败,是否可以使用不同的 `dask` 参数重新 运行 它?
If `prefect` task failed, is it possible to re-run it with different `dask` parameters?
考虑一个 prefect
任务,其内存需求事先未知。如果任务失败是因为worker内存不足,是否可以修改dask
worker参数,重新运行任务?
如果有一种方法可以在每次失败后将每个 worker 的内存分配增加一些值,那就太好了。
很难给出一个笼统的答案,因为这取决于您的基础设施。
- 例如,如果您想为每个流
cluster_class
临时 运行 的 Dask cluster_class
提供自定义关键字参数,您可以将动态函数传递给 DaskExecutor
的cluster_class
。此函数可以从 Parameter
任务中检索诸如 n_workers
之类的值,如下所示:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))
这意味着您可以使用临时定义的 n_workers
不同值启动新流程 运行。
- 第二个选项是在每个流 运行 的基础上在 运行 配置中分配更多内存 - 例如您可以从 UI:
覆盖 KubernetesRun
上的 memory_request
集
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:
上面的代码片段定义了 2 GB,但是如果您注意到流程 运行 以 OOM 错误结束并且您需要更多,您可以从 [=44] 触发新的流程 运行 =] 具有更高的内存请求。
- 最后一个选项是直接在流定义中覆盖执行程序值:
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)
只要您使用脚本存储(例如 Git 存储 类 之一,例如 GitHub、Git、Gitlab、Bitbucket等)而不是 pickle 存储,并且您使用 worker_memory
的修改值提交代码,这应该反映在您的新流程 运行 中,因为有关执行程序的元数据未存储在后端 - 它是从您的流存储中检索。
考虑一个 prefect
任务,其内存需求事先未知。如果任务失败是因为worker内存不足,是否可以修改dask
worker参数,重新运行任务?
如果有一种方法可以在每次失败后将每个 worker 的内存分配增加一些值,那就太好了。
很难给出一个笼统的答案,因为这取决于您的基础设施。
- 例如,如果您想为每个流
cluster_class
临时 运行 的 Daskcluster_class
提供自定义关键字参数,您可以将动态函数传递给DaskExecutor
的cluster_class
。此函数可以从Parameter
任务中检索诸如n_workers
之类的值,如下所示:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))
这意味着您可以使用临时定义的 n_workers
不同值启动新流程 运行。
- 第二个选项是在每个流 运行 的基础上在 运行 配置中分配更多内存 - 例如您可以从 UI: 覆盖
KubernetesRun
上的 memory_request
集
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:
上面的代码片段定义了 2 GB,但是如果您注意到流程 运行 以 OOM 错误结束并且您需要更多,您可以从 [=44] 触发新的流程 运行 =] 具有更高的内存请求。
- 最后一个选项是直接在流定义中覆盖执行程序值:
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)
只要您使用脚本存储(例如 Git 存储 类 之一,例如 GitHub、Git、Gitlab、Bitbucket等)而不是 pickle 存储,并且您使用 worker_memory
的修改值提交代码,这应该反映在您的新流程 运行 中,因为有关执行程序的元数据未存储在后端 - 它是从您的流存储中检索。