почему kubeflow в конвейерах вершин ai не хранит метаданные для артефакта набора данных

Я пытаюсь передать метаданные между компонентами функции Python, прикрепив к ним выходной артефакт в конвейере vertex ai kubeflow, из документации это кажется простым, но как бы я ни старался, я не могу заставить его работать. Я пытаюсь прикрепить строку к артефакту Output[Dataset] в одном компоненте, а затем использовать ее в следующем компоненте. Пример:

Этот конвейер состоит из двух компонентов: один для создания набора данных и прикрепления к нему метаданных, другой для получения артефакта набора данных и последующего доступа к метаданным.

Я пробовал с записью данных в файл и без нее.

      from kfp.dsl import pipeline, component
from kfp.dsl import Input, Output, Dataset, Metrics, Model
from kfp import compiler, dsl

@component(packages_to_install=["pandas"], base_image='python:3.9')
def make_metadata(
  data: Output[Dataset],
):
    import pandas as pd
    param_out_df = pd.DataFrame({"dummy_col": "dummy_row"}, index=[0])
    param_out_df.to_csv(data.path, index=False)
    
    data.metadata["data_num"] = 1
    data.metadata["data_str"] = "random string"    
  
@component(packages_to_install=["pandas"], base_image='python:3.9')
def use_metadata(
    data: Input[Dataset],
):
    print("data - metadata")
    print(data.metadata)
    
@dsl.pipeline(
   name='test-pipeline',
   description='An example pipeline that performs arithmetic calculations.', 
   pipeline_root=f'{BUCKET}/pipelines'
)
def metadata_pipeline():
    metadata_made = make_metadata()
    
    used_metadata = use_metadata(data=metadata_made.outputs["data"])
    
PIPELINE_NAME = "test-pipeline"    
PIPELINE_FILENAME = f"{PIPELINE_NAME}.yaml"

compiler.Compiler().compile(
  pipeline_func=metadata_pipeline, 
  package_path=PIPELINE_FILENAME

Этот код запускает созданный выше файл yaml конвейера в вершине.

      import datetime as datetime
from google.cloud import aiplatform

current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
test_run_name = f"{PIPELINE_NAME}_{current_time}"

aiplatform.init(project=PROJECT_ID, location=LOCATION, )
job = aiplatform.pipeline_jobs.PipelineJob(
    display_name=test_run_name,
    template_path=PIPELINE_FILENAME
)
job.run(sync=False)

Установленные пакеты kfp следующие:

      kfp==2.0.0b13
kfp-pipeline-spec==0.2.0
kfp-server-api==2.0.0a6

Я не только не могу увидеть его в операторе печати, что бы я ни попробовал, он также не отобразится в области происхождения метаданных вершины ai (заменяет чувствительный на «xxx»

      {
  "name": "xxx",
  "displayName": "data",
  "instanceSchemaTitle": "system.Dataset",
  "uri": "xxx",
  "etag": "xxx",
  "createTime": "2023-03-17T10:52:10.040Z",
  "updateTime": "2023-03-17T10:53:01.621Z",
  "state": "LIVE",
  "schemaTitle": "system.Dataset",
  "schemaVersion": "0.0.1",
  "metadata": {}
}

Любая помощь будет очень признательна, я понимаю, что могу передавать данные другими способами, такими как OutputPath, но концептуально привязывать их к элементам предпочтительнее, поскольку метаданные относятся к этому элементу.

Я строго следовал этому руководству, оно тоже не работает:

Vertex AI Pipelines: облегченные компоненты Python на основе функций и компонентный ввод-вывод.

Как и выше, я не вижу метаданные, прикрепленные к компоненту предварительной обработки, когда смотрю на происхождение или пытаюсь получить к ним доступ в следующем компоненте:

output_dataset_one.metadata["привет"] = "там"

1 ответ

Недавно я делал аналогичные вещи, используя NamedTuple, например, компонент, который я создал для экспорта таблицы BigQuery в GCS, сохранял список экспортированных файлов и их URI в списке.

Это то, что вы ищете?

      @component(
    packages_to_install=[
        "google-cloud-bigquery",
        "google-cloud-storage",
        "google-auth",
        "google_cloud_pipeline_components",
    ],
    base_image="python:3.9",
)
def _export_bq_data_to_gcs(
    project_id: str,
    dataset_id: str,
    table_id: str,
    location: str,
    staging_bucket: str,
    pipeline_name: str,
    folder_name: str,
) -> NamedTuple("Outputs", destination_folder=str, data_files=List[str]):
    from google.cloud import bigquery, storage

    Outputs = NamedTuple("Outputs", destination_folder=str, data_files=str)

    print(f"project_id: {project_id}")

    bq_client = bigquery.Client(project=project_id)
    destination_folder = f"{staging_bucket}/{pipeline_name}/{folder_name}"
    destination_uri = f"{destination_folder}/data*.csv"

    dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
    table_ref = dataset_ref.table(table_id)

    extract_job = bq_client.extract_table(table_ref, destination_uri, location=location)
    extract_job.result()

    print(
        f"Successfully Exported {project_id}:{dataset_id}.{table_id} to {destination_uri}"
    )

    storage_client = storage.Client(project=project_id)
    blobs = storage_client.list_blobs(
        staging_bucket.replace("gs://", ""), prefix=f"{pipeline_name}/{folder_name}"
    )
    data_files = [f"{staging_bucket}/{blob.name}" for blob in blobs]

    print(f"Dataset files exported: {data_files}")

    return Outputs(destination_folder=destination_folder, data_files=data_files)
Другие вопросы по тегам