Как передавать данные или файлы между контейнерными компонентами Kubeflow в Python
Я изучаю Kubeflow как вариант развертывания и подключения различных компонентов типичного конвейера машинного обучения. Я использую контейнеры докеров в качестве компонентов Kubeflow, и пока мне не удалось успешно использоватьContainerOp.file_outputs
объект для передачи результатов между компонентами.
Основываясь на моем понимании этой функции, создание и сохранение в файл, объявленный как один из file_outputs
компонента, должно вызывать его сохранение и быть доступным для чтения следующим компонентом.
Вот как я попытался объявить это в своем коде Python для конвейера:
import kfp.dsl as dsl
import kfp.gcp as gcp
@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
data_collector = dsl.ContainerOp(
name='data collector',
image='eu.gcr.io/kubeflow-demo-254012/data-collector',
arguments=[ "--project_id", project_id ],
file_outputs={ "output": '/output.txt' }
)
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=[ "--project_id", project_id ]
)
data_preprocessor.after(data_collector)
#TODO: add other components
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')
В коде Python для data-collector.py
компонент, я получаю набор данных, затем записываю его в output.txt
. Я могу читать из файла в том же компоненте, но не внутриdata-preprocessor.py
где я получаю FileNotFoundError
.
Использование file_outputs
недопустим для контейнерных компонентов Kubeflow или я неправильно использую его в своем коде? Если в моем случае это не вариант, можно ли программно создавать тома Kubernetes внутри кода Python объявления конвейера и использовать их вместоfile_outputs
?
3 ответа
Файлы, созданные в одном компоненте конвейера Kubeflow, являются локальными для контейнера. Чтобы ссылаться на него на следующих шагах, вам нужно будет передать его как:
data_preprocessor = dsl.ContainerOp(
name='data preprocessor',
image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
arguments=["--fetched_dataset", data_collector.outputs['output'],
"--project_id", project_id,
]
Примечание: data_collector.outputs['output']
будет содержать фактическое строковое содержимое файла /output.txt
(не путь к файлу). Если вы хотите, чтобы он содержал путь к файлу, вам необходимо записать набор данных в общее хранилище (например, s3 или смонтированный том PVC) и записать путь / ссылку на общее хранилище в /output.txt
. data_preprocessor
затем можно прочитать набор данных на основе пути.
Есть три основных шага:
сохраните файл outputs.txt, который будет включать данные / параметр / все, что вы хотите передать следующему компоненту.Примечание: он должен быть на корневом уровне, т.е. /output.txt
передать file_outputs={'output': '/output.txt'} в качестве аргументов, как показано в качестве примера.
внутри контейнера container_op, который вы напишете внутри аргумента передачи dsl.pipeline (для соответствующего аргумента компонента, которому требуется вывод от более раннего компонента) как comp1.output (здесь comp1 - это 1-й компонент, который производит вывод и сохраняет его в /output.txt)
import kfp
from kfp import dsl
def SendMsg(
send_msg: str = 'akash'
):
return dsl.ContainerOp(
name = 'Print msg',
image = 'docker.io/akashdesarda/comp1:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', send_msg
],
file_outputs={
'output': '/output.txt',
}
)
def GetMsg(
get_msg: str
):
return dsl.ContainerOp(
name = 'Read msg from 1st component',
image = 'docker.io/akashdesarda/comp2:latest',
command = ['python', 'msg.py'],
arguments=[
'--msg', get_msg
]
)
@dsl.pipeline(
name = 'Pass parameter',
description = 'Passing para')
def passing_parameter(send_msg):
comp1 = SendMsg(send_msg)
comp2 = GetMsg(comp1.output)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')
Вам не нужно записывать данные в общее хранилище, вы можете использовать
kfp.dsl.InputArgumentPath
для передачи вывода из функции python на вход операции контейнера.
@kfp.dsl.pipeline(
name='Build Model Server Pipeline',
description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
download_s3_files_task = download_archive_step(s3_src_path)
tarball_path = "/tmp/artifact.tar"
artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
build_container = kfp.dsl.ContainerOp(name ='build_container',
image ='python:3.8',
command=['sh', '-c'],
arguments=[
'ls -l ' + tarball_path + ';'
],
artifact_argument_paths=[artifact_tarball],
)