在 Prefect 中,任务值是否可以在流 运行 期间缓存?
In Prefect, can a task value be cached for the duration of the flow run?
我有一个使用 .map()
的流程;因此,我“循环”了多个输入,但是有些输入我只需要生成一次,但我注意到我的流程不断重新生成它们。
是否可以在 运行 期间 cache/checkpoint 任务的结果(用于其他任务)?
我的理解是可以像这样缓存一段特定的时间:
import datetime
from prefect import task
@task(cache_for=datetime.timedelta(hours=1))
def some_task():
...
但是,如果 运行 小于 cache_for
时间,缓存是否仍会保留下一个 运行(如果不是,我猜长时间的缓存会工作)。
是的,有几种不同的方法可以实现这种类型的缓存:
使用不同的缓存验证器
除了配置缓存过期(如上所示),您还可以选择配置 cache validator。在您的情况下,您可以使用输入或参数验证器。
使用缓存键
您可以通过在您的任务上指定 cache_key
在任务之间“共享”缓存(在单个流程内和跨流程):
@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
def some_task():
...
然后这将通过按键而不是任务 ID 查找您的候选 Cached
状态。
使用 file-based 目标
最后,越来越流行的设置是使用 file-based target
for your task。然后,您可以使用 flow_run_id
之类的内容和提供给您的任务的输入来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不重新运行。例如:
@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
...
如果满足以下两个条件,则此模板具有 re-using 目标数据的效果:
- 任务在同一天运行重新
- 任务重新运行作为同一流程的一部分运行
然后您可以跨多个任务(或者在您的情况下,跨所有映射的子任务)共享此模板。
请注意,如果需要,您还可以向 target
模板提供输入和参数。
我有一个使用 .map()
的流程;因此,我“循环”了多个输入,但是有些输入我只需要生成一次,但我注意到我的流程不断重新生成它们。
是否可以在 运行 期间 cache/checkpoint 任务的结果(用于其他任务)?
我的理解是可以像这样缓存一段特定的时间:
import datetime
from prefect import task
@task(cache_for=datetime.timedelta(hours=1))
def some_task():
...
但是,如果 运行 小于 cache_for
时间,缓存是否仍会保留下一个 运行(如果不是,我猜长时间的缓存会工作)。
是的,有几种不同的方法可以实现这种类型的缓存:
使用不同的缓存验证器
除了配置缓存过期(如上所示),您还可以选择配置 cache validator。在您的情况下,您可以使用输入或参数验证器。
使用缓存键
您可以通过在您的任务上指定 cache_key
在任务之间“共享”缓存(在单个流程内和跨流程):
@task(cache_for=datetime.timedelta(hours=1), cache_key="my-key")
def some_task():
...
然后这将通过按键而不是任务 ID 查找您的候选 Cached
状态。
使用 file-based 目标
最后,越来越流行的设置是使用 file-based target
for your task。然后,您可以使用 flow_run_id
之类的内容和提供给您的任务的输入来模板化此目标字符串。每当任务运行时,它首先检查指定目标位置是否存在数据,如果找到,则不重新运行。例如:
@task(target="{flow_run_id}/{scheduled_start_time:%Y-%d-%m}/results.bytes")
def some_task():
...
如果满足以下两个条件,则此模板具有 re-using 目标数据的效果:
- 任务在同一天运行重新
- 任务重新运行作为同一流程的一部分运行
然后您可以跨多个任务(或者在您的情况下,跨所有映射的子任务)共享此模板。
请注意,如果需要,您还可以向 target
模板提供输入和参数。