在 Prefect 中,任务值是否可以在流 运行 期间缓存?

In Prefect, can a task value be cached for the duration of the flow run?

我有一个使用 .map() 的流程;因此,我“循环”了多个输入,但是有些输入我只需要生成一次,但我注意到我的流程不断重新生成它们。

是否可以在 运行 期间 cache/checkpoint 任务的结果(用于其他任务)?

我的理解是可以像这样缓存一段特定的时间:

import datetime

from prefect import task

@task(cache_for=datetime.timedelta(hours=1))
def some_task():
    ...

但是,如果 运行 小于 cache_for 时间,缓存是否仍会保留下一个 运行(如果不是,我猜长时间的缓存会工作)。

是的,有几种不同的方法可以实现这种类型的缓存:

使用不同的缓存验证器

除了配置缓存过期(如上所示),您还可以选择配置 cache validator。在您的情况下,您可以使用输入或参数验证器。

使用缓存键

您可以通过在您的任务上指定 cache_key 在任务之间“共享”缓存(在单个流程内和跨流程):

@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
def some_task():
    ...

然后这将通过按键而不是任务 ID 查找您的候选 Cached 状态。

使用 file-based 目标

最后,越来越流行的设置是使用 file-based target for your task。然后,您可以使用 flow_run_id 之类的内容和提供给您的任务的输入来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不重新运行。例如:

@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
    ...

如果满足以下两个条件,则此模板具有 re-using 目标数据的效果:

  • 任务在同一天运行重新
  • 任务重新运行作为同一流程的一部分运行

然后您可以跨多个任务(或者在您的情况下,跨所有映射的子任务)共享此模板。

请注意,如果需要,您还可以向 target 模板提供输入和参数。