динамический выходной список конвейера kubeflow в качестве входного параметра

Я использую ParallelFor вместо динамического списка. Я хочу собрать все выходные данные цикла и передать их другому ContainerOp.
Что-то вроде следующего, что явно не работает, так какoutputs список будет статическим.

with dsl.ParallelFor(op1.output) as item:
    op2 = dsl.ContainerOp(
      name='op2',
      ...
      file_outputs={
         'outputs': '/outputs.json',
    })
    outputs.append(op2.output)


op3 = dsl.ContainerOp(
   name='op3',
   ...
   arguments=['--input': outputs]  # won't work
)

5 ответов

К сожалению, решение Арк-куна у меня не работает. Но есть простой способ реализовать рабочий процесс разветвления, если мы заранее знаем количество входов. Мы можем предварительно рассчитать DAG конвейера следующим образом:

@kfp.components.create_component_from_func
def my_transformer_op(item: str) -> str:
    return item + "_NEW"


@kfp.components.create_component_from_func
def my_aggregator_op(items: list) -> str:
    return "HELLO"


def pipeline(array_of_arguments):
    @dsl.pipeline(PIPELINE_NAME, PIPELINE_DESCRIPTION)
    def dynamic_pipeline():
        outputs = []
        for i in array_of_arguments:
            outputs.append(my_transformer_op(str(i)).output)
        my_aggregator_op(outputs)
    return dynamic_pipeline

...

    run_id = client.create_run_from_pipeline_func(
        pipeline(data_samples_chunks), {},
        run_name=PIPELINE_RUN,
        experiment_name=PIPELINE_EXPERIMENT).run_id

Я столкнулся с проблемами с динамическим "разветвлением", а затем "разветвлением" и с Kubeflow Pipelines. Может быть, немного неуклюже, но я использовал навесной ПВХ, чтобы преодолеть это.

Kubeflow позволяет монтировать известный PVC или создавать новый на лету, используя VolumeOp(ссылка здесь). В этом фрагменте показано, как использовать известный PVC.

    pvc_name = '<available-pvc-name>' 
    pvc_volume_name = '<pvc-uuid>' # pass the pvc uuid here

    # Op 1 creates a list to iterate over
    op_1 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=['echo "[1,2,3]"> /tmp/output.txt'],
            file_outputs={'output': '/tmp/output.txt'})

    # Using withParam here to iterate over the results from op1
    # and writing the results of each step to its own PVC
    with dsl.ParallelFor(op_1.output) as item:
        op_2 = dsl.ContainerOp(
            name='iterate',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo item-{item} > /tmp/output.txt; "  # <- write to output  
                       f"mkdir -p /mnt/{{workflow.uid}}; "  # <- make a dir under /mnt
                       f"echo item-{item}\n >> /mnt/{{workflow.uid}}"],  # <- append results from each step to the PVC
            file_outputs={'output': '/tmp/output.txt'},
            # mount the PVC
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)})

    op_3 = dsl.ContainerOp(
            name='echo',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f"echo /mnt/{{workflow.uid}} > /tmp/output.txt"],
            # mount the PVC again to use
            pvolumes={"/mnt": dsl.PipelineVolume(pvc=pvc_name, name=pvc_volume_name)},
            file_outputs={'output': '/tmp/output_2.txt'}).after(op_2)

Гарантировать, что op_3 бежит за петлями из op_2 с помощью after(op_2) в конце.

Примечание: это может быть деспотичный подход, и могут быть лучшие решения, если KFP разрешит это как часть компилятора KF, но я не мог заставить его работать. Если создать PVC в env легко, это может сработать в вашем случае.

Это работает только в том случае, если вы заранее знаете входы/выходы, поэтому это не действительно динамично. Но это действительно решило для моего текущего варианта использования.

На самом деле мне было очень сложно управлять конвейерами таким образом, я рекомендую взглянуть на Ploomber https://github.com/ploomber/ploomber/?ref=stacko .

Очень легко установить конвейеры и зависимости, и он интегрируется с большинством поставщиков (airflow, argo и т. д.). Я знаю, что сейчас они работают над соединителем Kubeflow (похожим на Kale, но проще). Во всяком случае, это облегчило мне жизнь.

Проблема в том, что op3 неправильно ссылается на вывод op2в качестве входного аргумента. Попробуй это:

op3 = dsl.ContainerOp(
    ...
    arguments=['--input': op2.outputs['outputs']]
)

Что-то вроде следующего, что явно не работает, поскольку список выходов будет статическим.

Это действительно будет работать, если вы используете правильные компоненты вместо того, чтобы вручную создавать ContainerOpс. Конечно, выходы должны быть небольшими.

@kfp.components.create_component_from_func
def my_producer_op(num: int) -> list:
  return list(range(num))

@kfp.components.create_component_from_func
def my_transformer_op(item: int) -> float:
  return item * 3.14

@kfp.components.create_component_from_func
def my_aggregator(numbers: list) -> float:
  return sum(float(n) for n in numbers)

def my_pipeline():
  nums = my_producer_op().output

  transformed_nums = []
  with dsl.ParallelFor(nums) as item:
      transformed_num = my_transformer_op(item).output
      transformed_nums.append(transformed_num)

  sum = my_aggregator(numbers=transformed_nums).output

PS Так же можно вручную использовать json.dumps([str(output) for output in outputs)для сериализации списка выходных данных (строк PipelineParams) в JSON. Хакерский и официально не поддерживается.

Другие вопросы по тегам