如何进入知府核心本地任务最终状态?
How to get in prefect core local task final state?
我构建了一个流程,如果 kwarg 为空,它会隐式跳过 运行 给定任务。
我在任务函数中使用类似这样的东西来跳过逻辑:
if kwargs.get('processors', Hierarchy()).__len__() == 0:
raise signals.SKIP('skipping task',
result=Prediction())
我想构建一些单元测试以确保跳过所述任务的最终状态。在任务级别获取状态的最简单方法是什么?
我可以从文档中看到如何获取流程而不是任务。
更新
为了补充 Chris 的回复,我使用了他提出的第一个选项。由于我的流程是在测试之外定义的,因此我创建了一个简单的函数来获取一组已跳过的任务。在测试中,将其与应该跳过的任务列表进行了比较:
def get_skipped_tasks(flow_state):
return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
为了完整起见,我将在此处包括几种方法;对于我的例子,我将使用这个基本流程:
from prefect import task, Flow
from prefect.engine.signals import SKIP
import random
@task
def random_number():
return random.randint(0, 100)
@task
def is_even(num):
if num % 2:
raise SKIP("odd number")
return True
with Flow("dummy") as flow:
even_task = is_even(random_number)
运行全流程
当 运行 交互时,您总是可以 运行 整个流并从父流 运行 状态访问单个任务状态;请注意,当您“调用”任务(例如,is_even(random_number)
)时会创建一个副本,因此您需要正确跟踪这些副本。
flow_state = flow.run()
assert flow_state.result[even_task].is_skipped() # for example
运行 一段带有模拟数据的流程
当 运行ning 交互时,您还可以传递任务字典 -> 声明 运行ner 将尊重;可以选择向这些状态提供数据:
from prefect.engine.state import Success
mocked_state = Success(result=2)
flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()
使用任务运行ner
最后,如果您想 运行 单独对此任务进行基于状态的测试,您可以使用 TaskRunner
。这会变得 有点 更复杂,因为您必须使用 Edge
s 重新创建上游依赖项。
from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge
runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()
我构建了一个流程,如果 kwarg 为空,它会隐式跳过 运行 给定任务。
我在任务函数中使用类似这样的东西来跳过逻辑:
if kwargs.get('processors', Hierarchy()).__len__() == 0:
raise signals.SKIP('skipping task',
result=Prediction())
我想构建一些单元测试以确保跳过所述任务的最终状态。在任务级别获取状态的最简单方法是什么?
我可以从文档中看到如何获取流程而不是任务。
更新
为了补充 Chris 的回复,我使用了他提出的第一个选项。由于我的流程是在测试之外定义的,因此我创建了一个简单的函数来获取一组已跳过的任务。在测试中,将其与应该跳过的任务列表进行了比较:
def get_skipped_tasks(flow_state):
return set(key.name for key, value in flow_state.result.items() if value.is_skipped())
为了完整起见,我将在此处包括几种方法;对于我的例子,我将使用这个基本流程:
from prefect import task, Flow
from prefect.engine.signals import SKIP
import random
@task
def random_number():
return random.randint(0, 100)
@task
def is_even(num):
if num % 2:
raise SKIP("odd number")
return True
with Flow("dummy") as flow:
even_task = is_even(random_number)
运行全流程
当 运行 交互时,您总是可以 运行 整个流并从父流 运行 状态访问单个任务状态;请注意,当您“调用”任务(例如,is_even(random_number)
)时会创建一个副本,因此您需要正确跟踪这些副本。
flow_state = flow.run()
assert flow_state.result[even_task].is_skipped() # for example
运行 一段带有模拟数据的流程
当 运行ning 交互时,您还可以传递任务字典 -> 声明 运行ner 将尊重;可以选择向这些状态提供数据:
from prefect.engine.state import Success
mocked_state = Success(result=2)
flow_state = flow.run(task_states={random_number: mocked_state})
assert not flow_state.result[even_task].is_skipped()
使用任务运行ner
最后,如果您想 运行 单独对此任务进行基于状态的测试,您可以使用 TaskRunner
。这会变得 有点 更复杂,因为您必须使用 Edge
s 重新创建上游依赖项。
from prefect.engine.task_runner import TaskRunner
from prefect.edge import Edge
runner = TaskRunner(task=even_task)
edge = Edge(key="num", upstream_task=random_number, downstream_task=even_task)
task_state = runner.run(upstream_states={edge: mocked_state})
assert not task_state.is_skipped()