获取元流工件的 s3 url 路径

get s3 url path of metaflow artifact

有没有办法获取步骤中存储的 Metaflow 工件的完整 s3 url 路径?

我查看了 Metaflow 的 DataArtifact class,但没有看到明显的 s3 路径 属性。

是的,你可以做到

Flow('MyFlow')[42]['foo'].task.artifacts.bar._object['location']

其中 MyFlow 是流程名称,42 是 运行 ID,foo 是考虑中的步骤,bar 是该步骤的工件。

根据@Savin 的回答,我编写了一个辅助函数来获取工件的 S3 URL,给定 运行 ID 和工件名称:

from metaflow import Flow, Metaflow, Run
from typing import List, Union


def get_artifact_s3url_from_run(
    run: Union[str, Run], name: str, legacy_names: List[str] = [], missing_ok: bool = False
) -> str:
    """
    Given a MetaFlow Run and a key, scans the run's tasks and returns the artifact's S3 URL with that key.

    NOTE: use get_artifact_from_run() if you want the artifact itself, not the S3 URL to the artifact.

    This allows us to find data artifacts even in flows that did not finish. If we change the name of an artifact,
    we can support backwards compatibility by also passing in the legacy keys. Note: we can avoid this by resuming a
    specific run and adding a node which re-maps the artifact to another key. This will assign the run a new ID.

    Args:
        missing_ok: whether to allow an artifact to be missing
        name: name of the attribute to look for in task.data
        run: a metaflow.Run() object, or a run ID
        legacy_names: backup names to check

    Returns:
       the value of the attribute. If attribute is not found

    Raises:
        DataartifactNotFoundError if artifact is not found and missing_ok=False
        ValueError if Flow not found
        ValueError if Flow is found but run ID is not.
    """
    namespace(None)  # allows us to access all runs in all namespaces
    names_to_check = [name] + legacy_names
    if isinstance(run, str):
        try:
            run = Run(run)
        except Exception as e:
            # run ID not found. see if we can find other runs and list them
            flow = run.split(sep="/")[0]
            try:
                flow = Flow(flow)
                raise ValueError(f"Could not find run ID {run}. Possible values: {flow.runs()}") from e
            except Exception as e2:
                raise ValueError(f"Could not find flow {flow}. Available flows: {Metaflow().flows}") from e2
    for name_ in names_to_check:
        for step_ in run:
            for task in step_:
                print(f"task {task} artifacts: {task.artifacts} \n \n")
                if task.artifacts is not None and name_ in task.artifacts:
                    # 
                    return getattr(task.artifacts, name_)._object["location"]

    if not missing_ok:
        raise DataArtifactNotFoundError(
            f"No data artifact with name {name} found in {run}. Also checked legacy names: {legacy_names}"
        )