почему kubeflow в конвейерах вершин ai не хранит метаданные для артефакта набора данных
Я пытаюсь передать метаданные между компонентами функции Python, прикрепив к ним выходной артефакт в конвейере vertex ai kubeflow, из документации это кажется простым, но как бы я ни старался, я не могу заставить его работать. Я пытаюсь прикрепить строку к артефакту Output[Dataset] в одном компоненте, а затем использовать ее в следующем компоненте. Пример:
Этот конвейер состоит из двух компонентов: один для создания набора данных и прикрепления к нему метаданных, другой для получения артефакта набора данных и последующего доступа к метаданным.
Я пробовал с записью данных в файл и без нее.
from kfp.dsl import pipeline, component
from kfp.dsl import Input, Output, Dataset, Metrics, Model
from kfp import compiler, dsl
@component(packages_to_install=["pandas"], base_image='python:3.9')
def make_metadata(
data: Output[Dataset],
):
import pandas as pd
param_out_df = pd.DataFrame({"dummy_col": "dummy_row"}, index=[0])
param_out_df.to_csv(data.path, index=False)
data.metadata["data_num"] = 1
data.metadata["data_str"] = "random string"
@component(packages_to_install=["pandas"], base_image='python:3.9')
def use_metadata(
data: Input[Dataset],
):
print("data - metadata")
print(data.metadata)
@dsl.pipeline(
name='test-pipeline',
description='An example pipeline that performs arithmetic calculations.',
pipeline_root=f'{BUCKET}/pipelines'
)
def metadata_pipeline():
metadata_made = make_metadata()
used_metadata = use_metadata(data=metadata_made.outputs["data"])
PIPELINE_NAME = "test-pipeline"
PIPELINE_FILENAME = f"{PIPELINE_NAME}.yaml"
compiler.Compiler().compile(
pipeline_func=metadata_pipeline,
package_path=PIPELINE_FILENAME
Этот код запускает созданный выше файл yaml конвейера в вершине.
import datetime as datetime
from google.cloud import aiplatform
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
test_run_name = f"{PIPELINE_NAME}_{current_time}"
aiplatform.init(project=PROJECT_ID, location=LOCATION, )
job = aiplatform.pipeline_jobs.PipelineJob(
display_name=test_run_name,
template_path=PIPELINE_FILENAME
)
job.run(sync=False)
Установленные пакеты kfp следующие:
kfp==2.0.0b13
kfp-pipeline-spec==0.2.0
kfp-server-api==2.0.0a6
Я не только не могу увидеть его в операторе печати, что бы я ни попробовал, он также не отобразится в области происхождения метаданных вершины ai (заменяет чувствительный на «xxx»
{
"name": "xxx",
"displayName": "data",
"instanceSchemaTitle": "system.Dataset",
"uri": "xxx",
"etag": "xxx",
"createTime": "2023-03-17T10:52:10.040Z",
"updateTime": "2023-03-17T10:53:01.621Z",
"state": "LIVE",
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1",
"metadata": {}
}
Любая помощь будет очень признательна, я понимаю, что могу передавать данные другими способами, такими как OutputPath, но концептуально привязывать их к элементам предпочтительнее, поскольку метаданные относятся к этому элементу.
Я строго следовал этому руководству, оно тоже не работает:
Vertex AI Pipelines: облегченные компоненты Python на основе функций и компонентный ввод-вывод.
Как и выше, я не вижу метаданные, прикрепленные к компоненту предварительной обработки, когда смотрю на происхождение или пытаюсь получить к ним доступ в следующем компоненте:
output_dataset_one.metadata["привет"] = "там"
1 ответ
Недавно я делал аналогичные вещи, используя NamedTuple, например, компонент, который я создал для экспорта таблицы BigQuery в GCS, сохранял список экспортированных файлов и их URI в списке.
Это то, что вы ищете?
@component(
packages_to_install=[
"google-cloud-bigquery",
"google-cloud-storage",
"google-auth",
"google_cloud_pipeline_components",
],
base_image="python:3.9",
)
def _export_bq_data_to_gcs(
project_id: str,
dataset_id: str,
table_id: str,
location: str,
staging_bucket: str,
pipeline_name: str,
folder_name: str,
) -> NamedTuple("Outputs", destination_folder=str, data_files=List[str]):
from google.cloud import bigquery, storage
Outputs = NamedTuple("Outputs", destination_folder=str, data_files=str)
print(f"project_id: {project_id}")
bq_client = bigquery.Client(project=project_id)
destination_folder = f"{staging_bucket}/{pipeline_name}/{folder_name}"
destination_uri = f"{destination_folder}/data*.csv"
dataset_ref = bigquery.DatasetReference(project_id, dataset_id)
table_ref = dataset_ref.table(table_id)
extract_job = bq_client.extract_table(table_ref, destination_uri, location=location)
extract_job.result()
print(
f"Successfully Exported {project_id}:{dataset_id}.{table_id} to {destination_uri}"
)
storage_client = storage.Client(project=project_id)
blobs = storage_client.list_blobs(
staging_bucket.replace("gs://", ""), prefix=f"{pipeline_name}/{folder_name}"
)
data_files = [f"{staging_bucket}/{blob.name}" for blob in blobs]
print(f"Dataset files exported: {data_files}")
return Outputs(destination_folder=destination_folder, data_files=data_files)