有没有办法在每个计划流程 运行 时更新参数?
Is there a way to update parameters as each scheduled flow run?
我正在尝试找到一种方法来更新用于计划流程每次迭代的参数 begin。
例如,假设我有一个流计划 运行 每年每周一一次。对于第一个星期一,流量需要 运行,参数为 5。下星期一需要 运行,参数为 7,依此类推。每周 运行 所需的参数会改变一个常数。
根据文档,我似乎可以为每个 运行 创建一个具有相应参数的时钟,但这对于计划许多 运行 的流程来说似乎过分了。
在 Prefect 中有更简单的方法吗?
这听起来像是您想将某种形式的业务逻辑封装到您的 Parameter
计算中,这是向您的流程添加新 Task
的一个很好的用例:
import prefect
from prefect import task, Flow, Parameter
varying_param = Parameter("param", default=None) # none as the default
@task(name="Varying Parameter")
def param_calculation(p):
time = prefect.context.scheduled_start_time
# if a value was provided, use it
if p is not None:
return p
# do some calculations to decide what value is appropriate
# and return it
with Flow("Minimal example") as flow:
param_value = param_calculation(varying_param)
# now use this value in downstream tasks
我正在尝试找到一种方法来更新用于计划流程每次迭代的参数 begin。
例如,假设我有一个流计划 运行 每年每周一一次。对于第一个星期一,流量需要 运行,参数为 5。下星期一需要 运行,参数为 7,依此类推。每周 运行 所需的参数会改变一个常数。
根据文档,我似乎可以为每个 运行 创建一个具有相应参数的时钟,但这对于计划许多 运行 的流程来说似乎过分了。
在 Prefect 中有更简单的方法吗?
这听起来像是您想将某种形式的业务逻辑封装到您的 Parameter
计算中,这是向您的流程添加新 Task
的一个很好的用例:
import prefect
from prefect import task, Flow, Parameter
varying_param = Parameter("param", default=None) # none as the default
@task(name="Varying Parameter")
def param_calculation(p):
time = prefect.context.scheduled_start_time
# if a value was provided, use it
if p is not None:
return p
# do some calculations to decide what value is appropriate
# and return it
with Flow("Minimal example") as flow:
param_value = param_calculation(varying_param)
# now use this value in downstream tasks