Как передавать данные или файлы между контейнерными компонентами Kubeflow в Python

Я изучаю Kubeflow как вариант развертывания и подключения различных компонентов типичного конвейера машинного обучения. Я использую контейнеры докеров в качестве компонентов Kubeflow, и пока мне не удалось успешно использоватьContainerOp.file_outputs объект для передачи результатов между компонентами.

Основываясь на моем понимании этой функции, создание и сохранение в файл, объявленный как один из file_outputs компонента, должно вызывать его сохранение и быть доступным для чтения следующим компонентом.

Вот как я попытался объявить это в своем коде Python для конвейера:

import kfp.dsl as dsl 
import kfp.gcp as gcp

@dsl.pipeline(name='kubeflow demo')
def pipeline(project_id='kubeflow-demo-254012'):
    data_collector = dsl.ContainerOp(
        name='data collector', 
        image='eu.gcr.io/kubeflow-demo-254012/data-collector',
        arguments=[ "--project_id", project_id ],
        file_outputs={ "output": '/output.txt' }
    )   
    data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=[ "--project_id", project_id ]
    )
    data_preprocessor.after(data_collector)
    #TODO: add other components
if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, __file__ + '.tar.gz')

В коде Python для data-collector.py компонент, я получаю набор данных, затем записываю его в output.txt. Я могу читать из файла в том же компоненте, но не внутриdata-preprocessor.py где я получаю FileNotFoundError.

Использование file_outputsнедопустим для контейнерных компонентов Kubeflow или я неправильно использую его в своем коде? Если в моем случае это не вариант, можно ли программно создавать тома Kubernetes внутри кода Python объявления конвейера и использовать их вместоfile_outputs?

3 ответа

Решение

Файлы, созданные в одном компоненте конвейера Kubeflow, являются локальными для контейнера. Чтобы ссылаться на него на следующих шагах, вам нужно будет передать его как:

data_preprocessor = dsl.ContainerOp(
        name='data preprocessor',
        image='eu.gcr.io/kubeflow-demo-254012/data-preprocessor',
        arguments=["--fetched_dataset", data_collector.outputs['output'],
                   "--project_id", project_id,
                  ]

Примечание: data_collector.outputs['output'] будет содержать фактическое строковое содержимое файла /output.txt(не путь к файлу). Если вы хотите, чтобы он содержал путь к файлу, вам необходимо записать набор данных в общее хранилище (например, s3 или смонтированный том PVC) и записать путь / ссылку на общее хранилище в /output.txt. data_preprocessor затем можно прочитать набор данных на основе пути.

Есть три основных шага:

  1. сохраните файл outputs.txt, который будет включать данные / параметр / все, что вы хотите передать следующему компоненту.Примечание: он должен быть на корневом уровне, т.е. /output.txt

  2. передать file_outputs={'output': '/output.txt'} в качестве аргументов, как показано в качестве примера.

  3. внутри контейнера container_op, который вы напишете внутри аргумента передачи dsl.pipeline (для соответствующего аргумента компонента, которому требуется вывод от более раннего компонента) как comp1.output (здесь comp1 - это 1-й компонент, который производит вывод и сохраняет его в /output.txt)

import kfp
from kfp import dsl

def SendMsg(
    send_msg: str = 'akash'
):
    return dsl.ContainerOp(
        name = 'Print msg', 
        image = 'docker.io/akashdesarda/comp1:latest', 
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', send_msg
        ],
        file_outputs={
            'output': '/output.txt',
        }
    )

def GetMsg(
    get_msg: str
):
    return dsl.ContainerOp(
        name = 'Read msg from 1st component',
        image = 'docker.io/akashdesarda/comp2:latest',
        command = ['python', 'msg.py'],
        arguments=[
            '--msg', get_msg
        ]
    )

@dsl.pipeline(
    name = 'Pass parameter',
    description = 'Passing para')
def  passing_parameter(send_msg):
    comp1 = SendMsg(send_msg)
    comp2 = GetMsg(comp1.output)


if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(passing_parameter, __file__ + '.tar.gz')

Вам не нужно записывать данные в общее хранилище, вы можете использовать kfp.dsl.InputArgumentPathдля передачи вывода из функции python на вход операции контейнера.

      @kfp.dsl.pipeline(
  name='Build Model Server Pipeline',
  description='Build a kserve model server pipeline.'
)
def build_model_server_pipeline(s3_src_path):
    download_s3_files_task = download_archive_step(s3_src_path)

    tarball_path = "/tmp/artifact.tar"
    artifact_tarball = kfp.dsl.InputArgumentPath(download_s3_files_task.outputs['output_tarball'], path=tarball_path)
    
    build_container = kfp.dsl.ContainerOp(name ='build_container',
                                          image ='python:3.8',
                                          command=['sh', '-c'],
                                          arguments=[
                                              'ls -l ' + tarball_path + ';'
                                          ],
                                          artifact_argument_paths=[artifact_tarball],
                                         )
Другие вопросы по тегам