Как в Kubeflow Pipelines отправить список элементов в облегченный компонент Python?

Я пытаюсь отправить список элементов в виде PipelineParameter легковесному компоненту.
Вот образец, который воспроизводит проблему. Вот функция:

def my_func(my_list: list) -> bool:
    print(f'my_list is {my_list}')
    print(f'my_list is of type {type(my_list)}')
    print(f'elem 0 is {my_list[0]}')
    print(f'elem 1 is {my_list[1]}')
    return True

И если я выполню это так:

test_data = ['abc', 'def']
my_func(test_data)

Он ведет себя так, как ожидалось:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

но если я заверну его в op и настрою конвейер:

import kfp

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
    my_op(my_list)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

А затем запустите конвейер:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'my_list': test_data})

Тогда кажется, что в какой-то момент мой список был преобразован в строку!

my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '

2 ответа

Решение

Вот обнаруженный мной обходной путь - сериализация аргументов в виде строки json. Не уверен, что это действительно лучший способ...

Голая функция становится:

def my_func(json_arg_str: str) -> bool:
    import json
    args = json.loads(json_arg_str)
    my_list = args['my_list']
    print(f'my_list is {my_list}')
    print(f'my_list is of type {type(my_list)}')
    print(f'elem 0 is {my_list[0]}')
    print(f'elem 1 is {my_list[1]}')
    return True

Что по-прежнему работает, пока вы передаете аргументы в виде строки json вместо списка:

test_data = '{"my_list":["abc", "def"]}'my_func(test_data)

Что дает ожидаемые результаты:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

И теперь конвейер изменен, чтобы принять str вместо PipelineParam типа kfp.dsl.types.List:

import kfp 

my_op = kfp.components.func_to_container_op(my_func)

@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
    my_op(json_arg_str)

kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')

Что при таком исполнении:

client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'json_arg_str': test_data})

Дает тот же результат:

my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def

Хотя это работает, я все же считаю этот обходной путь раздражающим. В чем тогда смысл kfp.dsl.types.List, если не для разрешения PipelineParam, который является списком?

В настоящее время лучшим вариантом кажется сериализация аргументов. С этим связана одна проблема: https://github.com/kubeflow/pipelines/issues/1901

Другие вопросы по тегам