获取元流工件的 s3 url 路径
get s3 url path of metaflow artifact
有没有办法获取步骤中存储的 Metaflow 工件的完整 s3 url 路径?
我查看了 Metaflow 的 DataArtifact class,但没有看到明显的 s3 路径 属性。
是的,你可以做到
Flow('MyFlow')[42]['foo'].task.artifacts.bar._object['location']
其中 MyFlow
是流程名称,42
是 运行 ID,foo
是考虑中的步骤,bar
是该步骤的工件。
根据@Savin 的回答,我编写了一个辅助函数来获取工件的 S3 URL,给定 运行 ID 和工件名称:
from metaflow import Flow, Metaflow, Run
from typing import List, Union
def get_artifact_s3url_from_run(
run: Union[str, Run], name: str, legacy_names: List[str] = [], missing_ok: bool = False
) -> str:
"""
Given a MetaFlow Run and a key, scans the run's tasks and returns the artifact's S3 URL with that key.
NOTE: use get_artifact_from_run() if you want the artifact itself, not the S3 URL to the artifact.
This allows us to find data artifacts even in flows that did not finish. If we change the name of an artifact,
we can support backwards compatibility by also passing in the legacy keys. Note: we can avoid this by resuming a
specific run and adding a node which re-maps the artifact to another key. This will assign the run a new ID.
Args:
missing_ok: whether to allow an artifact to be missing
name: name of the attribute to look for in task.data
run: a metaflow.Run() object, or a run ID
legacy_names: backup names to check
Returns:
the value of the attribute. If attribute is not found
Raises:
DataartifactNotFoundError if artifact is not found and missing_ok=False
ValueError if Flow not found
ValueError if Flow is found but run ID is not.
"""
namespace(None) # allows us to access all runs in all namespaces
names_to_check = [name] + legacy_names
if isinstance(run, str):
try:
run = Run(run)
except Exception as e:
# run ID not found. see if we can find other runs and list them
flow = run.split(sep="/")[0]
try:
flow = Flow(flow)
raise ValueError(f"Could not find run ID {run}. Possible values: {flow.runs()}") from e
except Exception as e2:
raise ValueError(f"Could not find flow {flow}. Available flows: {Metaflow().flows}") from e2
for name_ in names_to_check:
for step_ in run:
for task in step_:
print(f"task {task} artifacts: {task.artifacts} \n \n")
if task.artifacts is not None and name_ in task.artifacts:
#
return getattr(task.artifacts, name_)._object["location"]
if not missing_ok:
raise DataArtifactNotFoundError(
f"No data artifact with name {name} found in {run}. Also checked legacy names: {legacy_names}"
)
有没有办法获取步骤中存储的 Metaflow 工件的完整 s3 url 路径?
我查看了 Metaflow 的 DataArtifact class,但没有看到明显的 s3 路径 属性。
是的,你可以做到
Flow('MyFlow')[42]['foo'].task.artifacts.bar._object['location']
其中 MyFlow
是流程名称,42
是 运行 ID,foo
是考虑中的步骤,bar
是该步骤的工件。
根据@Savin 的回答,我编写了一个辅助函数来获取工件的 S3 URL,给定 运行 ID 和工件名称:
from metaflow import Flow, Metaflow, Run
from typing import List, Union
def get_artifact_s3url_from_run(
run: Union[str, Run], name: str, legacy_names: List[str] = [], missing_ok: bool = False
) -> str:
"""
Given a MetaFlow Run and a key, scans the run's tasks and returns the artifact's S3 URL with that key.
NOTE: use get_artifact_from_run() if you want the artifact itself, not the S3 URL to the artifact.
This allows us to find data artifacts even in flows that did not finish. If we change the name of an artifact,
we can support backwards compatibility by also passing in the legacy keys. Note: we can avoid this by resuming a
specific run and adding a node which re-maps the artifact to another key. This will assign the run a new ID.
Args:
missing_ok: whether to allow an artifact to be missing
name: name of the attribute to look for in task.data
run: a metaflow.Run() object, or a run ID
legacy_names: backup names to check
Returns:
the value of the attribute. If attribute is not found
Raises:
DataartifactNotFoundError if artifact is not found and missing_ok=False
ValueError if Flow not found
ValueError if Flow is found but run ID is not.
"""
namespace(None) # allows us to access all runs in all namespaces
names_to_check = [name] + legacy_names
if isinstance(run, str):
try:
run = Run(run)
except Exception as e:
# run ID not found. see if we can find other runs and list them
flow = run.split(sep="/")[0]
try:
flow = Flow(flow)
raise ValueError(f"Could not find run ID {run}. Possible values: {flow.runs()}") from e
except Exception as e2:
raise ValueError(f"Could not find flow {flow}. Available flows: {Metaflow().flows}") from e2
for name_ in names_to_check:
for step_ in run:
for task in step_:
print(f"task {task} artifacts: {task.artifacts} \n \n")
if task.artifacts is not None and name_ in task.artifacts:
#
return getattr(task.artifacts, name_)._object["location"]
if not missing_ok:
raise DataArtifactNotFoundError(
f"No data artifact with name {name} found in {run}. Also checked legacy names: {legacy_names}"
)