Как получить идентификатор запуска из компонента?
Я провожу некоторые эксперименты с Kubeflow и мне интересно получить идентификатор запуска для сохранения вместе с некоторыми метаданными о выполнении конвейера. Есть ли способ сделать это из компонента, такого как ContainerOp
?
6 ответов
Я пытался сделать это, используя DSL в Python, но сейчас это невозможно.
Единственный вариант, который я нашел, - это использовать метод, который они использовали в этом примере кода. Вы в основном объявляете строку, содержащую {{workflow.uid}}
, Он будет заменен фактическим значением во время выполнения.
Вы также можете сделать это, чтобы получить имя модуля, это было бы {{pod.name}}
,
Вы можете использовать kfp.dsl.EXECUTION_ID_PLACEHOLDER
а также kfp.dsl.RUN_ID_PLACEHOLDER
в качестве аргументов для вашего компонента. Во время выполнения они будут заменены фактическими значениями.
Поскольку конвейер kubeflow полагается на argo, вы можете использовать переменную argo, чтобы получить то, что вы хотите.
Например,
@func_to_container_op
def dummy(run_id, run_name) -> str:
return run_id, run_name
@dsl.pipeline(
name='test_pipeline',
)
def test_pipeline():
dummy('{{workflow.labels.pipeline/runid}}', '{{workflow.annotations.pipelines.kubeflow.org/run_name}}')
Вы обнаружите, что заполнители будут заменены правильными run_id и run_name.
Дополнительные переменные argo: https://github.com/argoproj/argo-workflows/blob/master/docs/variables.md
Чтобы узнать, что записано в метках и аннотациях при запуске конвейера kubeflow, просто получите соответствующий рабочий процесс от k8s.
kubectl get workflow/XXX -oyaml
Контейнер вашего компонента будет иметь переменную среды с именем HOSTNAME
которому присвоено уникальное имя модуля, из которого вы получаете все необходимые метаданные.
Для версии 1:
Кажется, чтоkfp.dsl.EXECUTION_ID_PLACEHOLDER
(идентификатор запуска компонента) иkfp.dsl.RUN_ID_PLACEHOLDER
(идентификатор запуска конвейера) выполнит то, что вы просите.
Для версии 2:
kfp.v2.dsl.PIPELINE_JOB_ID_PLACEHOLDER
иkfp.v2.dsl.PIPELINE_TASK_ID_PLACEHOLDER
соответственно.
create_run_from_pipeline_func возвращает RunPipelineResult, у которого есть атрибут run_id
client = kfp.Client(host)
result = client.create_run_from_pipeline_func(…)
result.run_id