wandb: опция Artifact.add_reference() для добавления определенного (не текущего) идентификатора версии или ETag, чтобы предотвратить повторную загрузку на s3?

Я чувствую, что это должно быть возможно, но я просмотрел код SDK wandb и не могу найти простой/логичный способ сделать это. Возможно , его удастся взломать, изменив записи манифеста в какой-то момент позже (но, возможно, до того, как артефакт будет зарегистрирован в wandb, поскольку тогда манифест и записи могут быть заблокированы)? Я видел такие вещи в коде SDK:

      version = manifest_entry.extra.get("versionID")
etag = manifest_entry.extra.get("etag")

Итак, я попытался взломать его вместе с чем-то вроде этого, и это работает, но кажется неправильным:

      import os
import wandb
import boto3

ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = "test" # os.environ.get("WANDB_PROJECT")
API_KEY = os.environ.get("WANDB_API_KEY")

api = wandb.Api()
run = wandb.init(entity=ENTITY, project=PROJECT, job_type="test upload")
file = "admin2Codes.txt"  # "admin1CodesASCII.txt" # (both already on s3 with a couple versions)
artifact = wandb.Artifact("test_data", type="dataset")

# modify one of the local files so it has a new md5hash etc.
with open(file, "a") as f:
    f.write("new_line_1\n")

# upload local file to s3
local_file_path = file
s3_url = f"s3://{ENTITY}/{PROJECT}/artifacts/dataset/test_data/{file}"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
filename = s3_url_arr[-1]
s3_dir = s3_url.replace(filename, "")
key = "/".join(s3_url_arr[1:])

s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
    local_file_path,
    s3_bucket,
    key,
    # save the md5_digest in metadata,
    # can be used later to only upload new files to s3,
    # and AWS don't digest the file consistently in the E-tag
    ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)
head_response = s3_client.head_object(Bucket=s3_bucket, Key=key)
version_id: str = head_response["VersionId"]
print(version_id)

# upload a link/ref to this s3 object in wandb:
artifact.add_reference(s3_dir)
# at this point we might be able to modify the artifact._manifest.entries and each entry.extra.get("etag") etc.?
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])
# set these to an older version on s3 that we know we want (rather than latest) - do this via wandb public API:
dataset_v2 = api.artifact(f"{ENTITY}/{PROJECT}/test_data:v2", type="dataset")
# artifact._manifest.add_entry(dataset_v2.manifest.entries["admin1CodesASCII.txt"])
artifact._manifest.entries["admin1CodesASCII.txt"] = dataset_v2.manifest.entries[
    "admin1CodesASCII.txt"
]
# verify that it did change:
print([(name, entry.extra) for name, entry in artifact._manifest.entries.items()])

run.log_artifact(artifact)  # at this point the manifest is locked I believe?
artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
print(artifact.name)
run_id = run.id
run.finish()
curr_run = api.run(f"{ENTITY}/{PROJECT}/{run_id}")
used_artifacts = curr_run.used_artifacts()
logged_artifacts = curr_run.logged_artifacts()

Я на правильном пути здесь? Я предполагаю, что другим обходным путем является создание копии на s3 (чтобы более старая версия снова была самой последней), но я хотел избежать этого, поскольку 1 файл, который я хочу использовать в старой версии, - это большая модель NLP и единственные файлы Я хочу изменить небольшие файлы config.json и т. д. (поэтому кажется очень расточительным загружать все файлы снова).

1 ответ

Я думаю, что нашел правильный способ сделать это сейчас:

      import os
import wandb
import boto3
from wandb.util import md5_file

ENTITY = os.environ.get("WANDB_ENTITY")
PROJECT = os.environ.get("WANDB_PROJECT")


def wandb_update_only_some_files_in_artifact(
    existing_artifact_name: str,
    new_s3_file_urls: list[str],
    entity: str = ENTITY,
    project: str = PROJECT,
) -> Artifact:
    """If you want to just update a config.json file for example,
    but the rest of the artifact can remain the same, then you can
    use this functions like so:
    wandb_update_only_some_files_in_artifact(
        "old_artifact:v3",
        ["s3://bucket/prefix/config.json"],
    )
    and then all the other files like model.bin will be the same as in v3,
    even if there was a v4 or v5 in between (as the v3 VersionIds are used)

    Args:
        existing_artifact_name (str): name with version like "old_artifact:v3"
        new_s3_file_urls (list[str]): files that should be updated
        entity (str, optional): wandb entity. Defaults to ENTITY.
        project (str, optional): wandb project. Defaults to PROJECT.

    Returns:
        Artifact: the new artifact object
    """
    api = wandb.Api(overrides={"entity": entity, "project": project})
    old_artifact = api.artifact(existing_artifact_name)
    old_artifact_name = re.sub(r":v\d+$", "", old_artifact.name)
    with wandb.init(entity=entity, project=project) as run:
        new_artifact = wandb.Artifact(old_artifact_name, type=old_artifact.type)

        s3_file_names = [s3_url.split("/")[-1] for s3_url in new_s3_file_urls]
        # add the new ones:
        for s3_url, filename in zip(new_s3_file_urls, s3_file_names):
            new_artifact.add_reference(s3_url, filename)
        # add the old ones:
        for filename, entry in old_artifact.manifest.entries.items():
            if filename in s3_file_names:
                continue
            new_artifact.add_reference(entry, filename)
            # this also works but feels hackier:
            # new_artifact._manifest.entries[filename] = entry

        run.log_artifact(new_artifact)
        new_artifact.wait()  # wait for upload to finish (blocking - but should be very quick given it is just an s3 link)
        print(new_artifact.name)
        print(run.id)
    return new_artifact


# usage:
local_file_path = "config.json" # modified file
s3_url = "s3://bucket/prefix/config.json"
s3_url_arr = s3_url.replace("s3://", "").split("/")
s3_bucket = s3_url_arr[0]
key = "/".join(s3_url_arr[1:])

s3_client = boto3.client("s3")
file_digest = md5_file(local_file_path)
s3_client.upload_file(
    local_file_path,
    s3_bucket,
    key,
    # save the md5_digest in metadata,
    # can be used later to only upload new files to s3,
    # as AWS doesn't digest the file consistently in the E-tag
    ExtraArgs={"Metadata": {"md5_digest": file_digest}},
)

wandb_update_only_some_files_in_artifact(
    "old_artifact:v3",
    ["s3://bucket/prefix/config.json"],
)
Другие вопросы по тегам