Как получить идентификатор запуска из компонента?

Я провожу некоторые эксперименты с Kubeflow и мне интересно получить идентификатор запуска для сохранения вместе с некоторыми метаданными о выполнении конвейера. Есть ли способ сделать это из компонента, такого как ContainerOp?

6 ответов

Решение

Я пытался сделать это, используя DSL в Python, но сейчас это невозможно.

Единственный вариант, который я нашел, - это использовать метод, который они использовали в этом примере кода. Вы в основном объявляете строку, содержащую {{workflow.uid}}, Он будет заменен фактическим значением во время выполнения.

Вы также можете сделать это, чтобы получить имя модуля, это было бы {{pod.name}},

Вы можете использовать kfp.dsl.EXECUTION_ID_PLACEHOLDER а также kfp.dsl.RUN_ID_PLACEHOLDERв качестве аргументов для вашего компонента. Во время выполнения они будут заменены фактическими значениями.

Поскольку конвейер kubeflow полагается на argo, вы можете использовать переменную argo, чтобы получить то, что вы хотите.

Например,

      @func_to_container_op
def dummy(run_id, run_name) -> str:
    return run_id, run_name

@dsl.pipeline(
    name='test_pipeline',
)
def test_pipeline():
  dummy('{{workflow.labels.pipeline/runid}}', '{{workflow.annotations.pipelines.kubeflow.org/run_name}}')

Вы обнаружите, что заполнители будут заменены правильными run_id и run_name.

Дополнительные переменные argo: https://github.com/argoproj/argo-workflows/blob/master/docs/variables.md

Чтобы узнать, что записано в метках и аннотациях при запуске конвейера kubeflow, просто получите соответствующий рабочий процесс от k8s.

      kubectl get workflow/XXX -oyaml

Контейнер вашего компонента будет иметь переменную среды с именем HOSTNAME которому присвоено уникальное имя модуля, из которого вы получаете все необходимые метаданные.

Для версии 1:

Кажется, чтоkfp.dsl.EXECUTION_ID_PLACEHOLDER(идентификатор запуска компонента) иkfp.dsl.RUN_ID_PLACEHOLDER(идентификатор запуска конвейера) выполнит то, что вы просите.

источник

Для версии 2:

kfp.v2.dsl.PIPELINE_JOB_ID_PLACEHOLDERиkfp.v2.dsl.PIPELINE_TASK_ID_PLACEHOLDERсоответственно.

источник

create_run_from_pipeline_func возвращает RunPipelineResult, у которого есть атрибут run_id

      client = kfp.Client(host)
result = client.create_run_from_pipeline_func(…) 
result.run_id
Другие вопросы по тегам