wandb: artifact.add_reference() 选项来添加特定的(不是当前的)versionId 或 ETag 来停止重新上传到 s3 的需要?
wandb: artifact.add_reference() option to add specific (not current) versionId or ETag to stop the need for re-upload to s3?
我觉得这应该是可行的,但我查看了 wandb SDK 代码,但找不到 easy/logical 的方法。它 可能 可以通过修改清单条目在稍后的某个时间点破解它(但可能在工件被记录到 wandb 之前清单和条目可能被锁定)?我在 SDK 代码中看到了这样的事情:
version = manifest_entry.extra.get("versionID")
etag = manifest_entry.extra.get("etag")
那么,我想我们或许可以编辑那些?
更新
所以,我尝试将它与类似的东西一起破解并且它有效但感觉不对:
import os
import wandb
import boto3
from wandb.util import md5_file
ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
API_KEY = os.environ.get("WANDB_API_KEY")
api = api = wandb.Api(overrides={"entity": ENTITY, "project": ENTITY})
run = wandb.init(entity=ENTITY, project=PROJECT, job_type="test upload")
file = "admin2Codes.txt" # "admin1CodesASCII.txt" # (both already on s3 with a couple versions)
artifact = wandb.Artifact("test_data", type="dataset")
# modify one of the local files so it has a new md5hash etc.
with open(file, "a") as f:
f.write("new_line_1\n")
# upload local file to s3
local_file_path = file
s3_url = f"s3://bucket/prefix/{file}"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])
s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
local_file_path,
s3_bucket,
key,
# save the md5_digest in metadata,
# can be used later to only upload new files to s3,
# as AWS doesn't digest the file consistently in the E-tag
ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
head_response = s3_client.head_object(Bucket=s3_bucket, Key=key)
version_id: str = head_response["VersionId"]
print(version_id)
# upload a link/ref to this s3 object in wandb:
artifact.add_reference(s3_dir)
# at this point we might be able to modify the artifact._manifest.entries and each entry.extra.get("etag") etc.?
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
# set these to an older version on s3 that we know we want (rather than latest) - do this via wandb public API:
dataset_v2 = api.artifact(f"{ENTITY}/{PROJECT}/test_data:v2", type="dataset")
# artifact._manifest.add_entry(dataset_v2.manifest.entries["admin1CodesASCII.txt"])
artifact._manifest.entries["admin1CodesASCII.txt"] = dataset_v2.manifest.entries[
"admin1CodesASCII.txt"
]
# verify that it did change:
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
run.log_artifact(artifact) # at this point the manifest is locked I believe?
artifact.wait() # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(artifact.name)
run_id = run.id
run.finish()
curr_run = api.run(f"{ENTITY}/{PROJECT}/{run_id}")
used_artifacts = curr_run.used_artifacts()
logged_artifacts = curr_run.logged_artifacts()
我走的路对吗?我想另一个解决方法是在 s3 上制作一个副本(这样旧版本又是最新的)但我想避免这种情况,因为我想使用旧版本的 1 个文件是一个大型 NLP 模型,并且唯一的文件我想更改小 config.json 个文件等(所以再次上传所有文件似乎很浪费)。
我还想知道,当我将对象的旧版本复制回存储桶中的同一个键时,是否会创建一个真正的副本,或者就像指向同一底层对象的指针一样。 boto3 和 AWS 文档都没有明确说明 - 尽管它看起来像是一个正确的副本。
我想我现在找到了正确的方法:
import os
import wandb
import boto3
from wandb.util import md5_file
ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
def wandb_update_only_some_files_in_artifact(
existing_artifact_name: str,
new_s3_file_urls: list[str],
entity: str = ENTITY,
project: str = PROJECT,
) -> Artifact:
"""If you want to just update a config.json file for example,
but the rest of the artifact can remain the same, then you can
use this functions like so:
wandb_update_only_some_files_in_artifact(
"old_artifact:v3",
["s3://bucket/prefix/config.json"],
)
and then all the other files like model.bin will be the same as in v3,
even if there was a v4 or v5 in between (as the v3 VersionIds are used)
Args:
existing_artifact_name (str): name with version like "old_artifact:v3"
new_s3_file_urls (list[str]): files that should be updated
entity (str, optional): wandb entity. Defaults to ENTITY.
project (str, optional): wandb project. Defaults to PROJECT.
Returns:
Artifact: the new artifact object
"""
api = wandb.Api(overrides={"entity": entity, "project": project})
old_artifact = api.artifact(existing_artifact_name)
old_artifact_name = re.sub(r":v\d+$", "", old_artifact.name)
with wandb.init(entity=entity, project=project) as run:
new_artifact = wandb.Artifact(old_artifact_name, type=old_artifact.type)
s3_file_names = [s3_url.split("/")[-1] for s3_url in new_s3_file_urls]
# add the new ones:
for s3_url, filename in zip(new_s3_file_urls, s3_file_names):
new_artifact.add_reference(s3_url, filename)
# add the old ones:
for filename, entry in old_artifact.manifest.entries.items():
if filename in s3_file_names:
continue
new_artifact.add_reference(entry, filename)
# this also works but feels hackier:
# new_artifact._manifest.entries[filename] = entry
run.log_artifact(new_artifact)
new_artifact.wait() # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(new_artifact.name)
print(run.id)
return new_artifact
# usage:
local_file_path = "config.json" # modified file
s3_url = "s3://bucket/prefix/config.json"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])
s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
local_file_path,
s3_bucket,
key,
# save the md5_digest in metadata,
# can be used later to only upload new files to s3,
# as AWS doesn't digest the file consistently in the E-tag
ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
wandb_update_only_some_files_in_artifact(
"old_artifact:v3",
["s3://bucket/prefix/config.json"],
)
我觉得这应该是可行的,但我查看了 wandb SDK 代码,但找不到 easy/logical 的方法。它 可能 可以通过修改清单条目在稍后的某个时间点破解它(但可能在工件被记录到 wandb 之前清单和条目可能被锁定)?我在 SDK 代码中看到了这样的事情:
version = manifest_entry.extra.get("versionID")
etag = manifest_entry.extra.get("etag")
那么,我想我们或许可以编辑那些?
更新
所以,我尝试将它与类似的东西一起破解并且它有效但感觉不对:
import os
import wandb
import boto3
from wandb.util import md5_file
ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
API_KEY = os.environ.get("WANDB_API_KEY")
api = api = wandb.Api(overrides={"entity": ENTITY, "project": ENTITY})
run = wandb.init(entity=ENTITY, project=PROJECT, job_type="test upload")
file = "admin2Codes.txt" # "admin1CodesASCII.txt" # (both already on s3 with a couple versions)
artifact = wandb.Artifact("test_data", type="dataset")
# modify one of the local files so it has a new md5hash etc.
with open(file, "a") as f:
f.write("new_line_1\n")
# upload local file to s3
local_file_path = file
s3_url = f"s3://bucket/prefix/{file}"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])
s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
local_file_path,
s3_bucket,
key,
# save the md5_digest in metadata,
# can be used later to only upload new files to s3,
# as AWS doesn't digest the file consistently in the E-tag
ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
head_response = s3_client.head_object(Bucket=s3_bucket, Key=key)
version_id: str = head_response["VersionId"]
print(version_id)
# upload a link/ref to this s3 object in wandb:
artifact.add_reference(s3_dir)
# at this point we might be able to modify the artifact._manifest.entries and each entry.extra.get("etag") etc.?
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
# set these to an older version on s3 that we know we want (rather than latest) - do this via wandb public API:
dataset_v2 = api.artifact(f"{ENTITY}/{PROJECT}/test_data:v2", type="dataset")
# artifact._manifest.add_entry(dataset_v2.manifest.entries["admin1CodesASCII.txt"])
artifact._manifest.entries["admin1CodesASCII.txt"] = dataset_v2.manifest.entries[
"admin1CodesASCII.txt"
]
# verify that it did change:
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
run.log_artifact(artifact) # at this point the manifest is locked I believe?
artifact.wait() # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(artifact.name)
run_id = run.id
run.finish()
curr_run = api.run(f"{ENTITY}/{PROJECT}/{run_id}")
used_artifacts = curr_run.used_artifacts()
logged_artifacts = curr_run.logged_artifacts()
我走的路对吗?我想另一个解决方法是在 s3 上制作一个副本(这样旧版本又是最新的)但我想避免这种情况,因为我想使用旧版本的 1 个文件是一个大型 NLP 模型,并且唯一的文件我想更改小 config.json 个文件等(所以再次上传所有文件似乎很浪费)。
我还想知道,当我将对象的旧版本复制回存储桶中的同一个键时,是否会创建一个真正的副本,或者就像指向同一底层对象的指针一样。 boto3 和 AWS 文档都没有明确说明 - 尽管它看起来像是一个正确的副本。
我想我现在找到了正确的方法:
import os
import wandb
import boto3
from wandb.util import md5_file
ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")
def wandb_update_only_some_files_in_artifact(
existing_artifact_name: str,
new_s3_file_urls: list[str],
entity: str = ENTITY,
project: str = PROJECT,
) -> Artifact:
"""If you want to just update a config.json file for example,
but the rest of the artifact can remain the same, then you can
use this functions like so:
wandb_update_only_some_files_in_artifact(
"old_artifact:v3",
["s3://bucket/prefix/config.json"],
)
and then all the other files like model.bin will be the same as in v3,
even if there was a v4 or v5 in between (as the v3 VersionIds are used)
Args:
existing_artifact_name (str): name with version like "old_artifact:v3"
new_s3_file_urls (list[str]): files that should be updated
entity (str, optional): wandb entity. Defaults to ENTITY.
project (str, optional): wandb project. Defaults to PROJECT.
Returns:
Artifact: the new artifact object
"""
api = wandb.Api(overrides={"entity": entity, "project": project})
old_artifact = api.artifact(existing_artifact_name)
old_artifact_name = re.sub(r":v\d+$", "", old_artifact.name)
with wandb.init(entity=entity, project=project) as run:
new_artifact = wandb.Artifact(old_artifact_name, type=old_artifact.type)
s3_file_names = [s3_url.split("/")[-1] for s3_url in new_s3_file_urls]
# add the new ones:
for s3_url, filename in zip(new_s3_file_urls, s3_file_names):
new_artifact.add_reference(s3_url, filename)
# add the old ones:
for filename, entry in old_artifact.manifest.entries.items():
if filename in s3_file_names:
continue
new_artifact.add_reference(entry, filename)
# this also works but feels hackier:
# new_artifact._manifest.entries[filename] = entry
run.log_artifact(new_artifact)
new_artifact.wait() # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(new_artifact.name)
print(run.id)
return new_artifact
# usage:
local_file_path = "config.json" # modified file
s3_url = "s3://bucket/prefix/config.json"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])
s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
local_file_path,
s3_bucket,
key,
# save the md5_digest in metadata,
# can be used later to only upload new files to s3,
# as AWS doesn't digest the file consistently in the E-tag
ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
wandb_update_only_some_files_in_artifact(
"old_artifact:v3",
["s3://bucket/prefix/config.json"],
)