有没有办法为 Prefect 中的新流程回填历史数据(一次)?
Is there a way to backfill historical data (once) for a new Flow in Prefect?
我刚开始阅读有关 Prefect 的内容(并且有一点使用 Airflow 的经验)。
我的目标是在 Prefect 中设置一个每天 运行 的任务,并将数据收集到一个文件夹中(我想这正是 Prefect 可以帮助我做的事情)。
此外,我的目标是填充历史数据(就好像我 运行 这份工作回到了过去)。
在 Airflow 中有一个 start_date 的概念,在过去设置时,DAG 将 运行 自该日期起并在每个时间间隔填充。
例如,如果我有一个任务需要一个日期和 returns 该日期的数据,例如:
# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
return fetched_values_in_json(date)
Prefect 中有本地方法可以做到这一点吗?
尽管提到了回填 here,但我在这里或文档中的任何地方都找不到这个答案。
任何帮助/指导都将非常有用。
我尝试了什么:
当我设置schedule
为:
from datetime import datetime, timedelta
from prefect.schedules import Schedule
schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
然后我做 flow.run()
我只是得到:
INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
我期望的是 运行 自从我提供的 start_date
之后暂停,直到到达当前时间并等待下一个时间表。
Prefect 不会对您的流程或其任务如何依赖于时间做出任何隐含的假设,因此执行回填取决于您的流程的结构。时间 明确地 影响流的方式通常有两种:
- 通过
Parameter
或 DateTimeParameter
- 到
prefect.context
(其中包括许多 time-related 字段,描述为 here)
鉴于此,可以通过创建任意数量的 ad-hoc 计划流 运行 并覆盖适当的上下文键或默认参数值来实现回填。 (请注意,可以为任何流程创建 ad-hoc 运行s,无论该流程是否有计划。)
为了更精确,这里有两个触发单个回填 运行 的示例(为了容纳更多 运行s,遍历适当的值并为 运行 创建一个 运行每个):
使用上下文
import pendulum
import prefect
@prefect.task
def do_something_time_specific():
"""
This task uses the timestamp provided to the custom `backfill_time`
context key; if that does not exist, it falls back on the built-in
`scheduled_start_time` context key.
"""
current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
flow = Flow("backfill", tasks=[do_something_time_specific])
## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
context={"backfill_time": "1986-01-02"}) # will use old timestamp
使用参数
import pendulum
import prefect
current_time = prefect.Parameter("current_time", default=None)
@prefect.task
def do_something_time_specific(current_time):
"""
This task uses the timestamp provided to the task explicitly.
"""
current_time = current_time or pendulum.now("utc") # uses "now" if not provided
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
with Flow("backfill") as flow:
do_something_time_specific(current_time)
## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
parameters={"current_time": "1986-01-02"}) # will use old timestamp
较新的参数 类 例如 DateTimeParameter
提供更好的类型保证,但希望这能证明这个想法。
EDIT:为了完整起见,请注意 ad-hoc 运行s 可以在 Core 中为具有时间表的流程创建 运行ning flow.run(run_on_schedule=False)
我刚开始阅读有关 Prefect 的内容(并且有一点使用 Airflow 的经验)。
我的目标是在 Prefect 中设置一个每天 运行 的任务,并将数据收集到一个文件夹中(我想这正是 Prefect 可以帮助我做的事情)。 此外,我的目标是填充历史数据(就好像我 运行 这份工作回到了过去)。
在 Airflow 中有一个 start_date 的概念,在过去设置时,DAG 将 运行 自该日期起并在每个时间间隔填充。
例如,如果我有一个任务需要一个日期和 returns 该日期的数据,例如:
# Pseudo code
def get_values_from_somewhere(date: datetime) -> dict:
return fetched_values_in_json(date)
Prefect 中有本地方法可以做到这一点吗? 尽管提到了回填 here,但我在这里或文档中的任何地方都找不到这个答案。 任何帮助/指导都将非常有用。
我尝试了什么:
当我设置schedule
为:
from datetime import datetime, timedelta
from prefect.schedules import Schedule
schedule = Schedule(clocks=[IntervalClock(interval=timedelta(hours=24), start_date=datetime(2019, 1, 1))])
然后我做 flow.run()
我只是得到:
INFO:prefect.My-Task:Waiting for next scheduled run at 2020-09-24T00:00:00+00:00
我期望的是 运行 自从我提供的 start_date
之后暂停,直到到达当前时间并等待下一个时间表。
Prefect 不会对您的流程或其任务如何依赖于时间做出任何隐含的假设,因此执行回填取决于您的流程的结构。时间 明确地 影响流的方式通常有两种:
- 通过
Parameter
或DateTimeParameter
- 到
prefect.context
(其中包括许多 time-related 字段,描述为 here)
鉴于此,可以通过创建任意数量的 ad-hoc 计划流 运行 并覆盖适当的上下文键或默认参数值来实现回填。 (请注意,可以为任何流程创建 ad-hoc 运行s,无论该流程是否有计划。)
为了更精确,这里有两个触发单个回填 运行 的示例(为了容纳更多 运行s,遍历适当的值并为 运行 创建一个 运行每个):
使用上下文
import pendulum
import prefect
@prefect.task
def do_something_time_specific():
"""
This task uses the timestamp provided to the custom `backfill_time`
context key; if that does not exist, it falls back on the built-in
`scheduled_start_time` context key.
"""
current_time = prefect.context.get("backfill_time") or prefect.context.get("scheduled_start_time")
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
flow = Flow("backfill", tasks=[do_something_time_specific])
## using Core
flow.run() # will use current timestamp
flow.run(context={"backfill_time": "1986-01-02"}) # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
context={"backfill_time": "1986-01-02"}) # will use old timestamp
使用参数
import pendulum
import prefect
current_time = prefect.Parameter("current_time", default=None)
@prefect.task
def do_something_time_specific(current_time):
"""
This task uses the timestamp provided to the task explicitly.
"""
current_time = current_time or pendulum.now("utc") # uses "now" if not provided
if isinstance(current_time, str):
current_time = pendulum.parse(current_time)
# performs some action dealing with this timestamp
with Flow("backfill") as flow:
do_something_time_specific(current_time)
## using Core
flow.run() # will use current timestamp
flow.run(current_time="1986-01-02") # will use old timestamp
## using an API
prefect.Client().create_flow_run(flow_id="FLOWID",
parameters={"current_time": "1986-01-02"}) # will use old timestamp
较新的参数 类 例如 DateTimeParameter
提供更好的类型保证,但希望这能证明这个想法。
EDIT:为了完整起见,请注意 ad-hoc 运行s 可以在 Core 中为具有时间表的流程创建 运行ning flow.run(run_on_schedule=False)