Как в Kubeflow Pipelines отправить список элементов в облегченный компонент Python?
Я пытаюсь отправить список элементов в виде PipelineParameter легковесному компоненту.
Вот образец, который воспроизводит проблему. Вот функция:
def my_func(my_list: list) -> bool:
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
И если я выполню это так:
test_data = ['abc', 'def']
my_func(test_data)
Он ведет себя так, как ожидалось:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
но если я заверну его в op и настрою конвейер:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(my_list: kfp.dsl.PipelineParam = kfp.dsl.PipelineParam('my_list', param_type=kfp.dsl.types.List())):
my_op(my_list)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
А затем запустите конвейер:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'my_list': test_data})
Тогда кажется, что в какой-то момент мой список был преобразован в строку!
my_list is ['abc', 'def']
my_list is of type <class 'str'>
elem 0 is [
elem 1 is '
2 ответа
Вот обнаруженный мной обходной путь - сериализация аргументов в виде строки json. Не уверен, что это действительно лучший способ...
Голая функция становится:
def my_func(json_arg_str: str) -> bool:
import json
args = json.loads(json_arg_str)
my_list = args['my_list']
print(f'my_list is {my_list}')
print(f'my_list is of type {type(my_list)}')
print(f'elem 0 is {my_list[0]}')
print(f'elem 1 is {my_list[1]}')
return True
Что по-прежнему работает, пока вы передаете аргументы в виде строки json вместо списка:
test_data = '{"my_list":["abc", "def"]}'my_func(test_data)
Что дает ожидаемые результаты:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
И теперь конвейер изменен, чтобы принять str
вместо PipelineParam
типа kfp.dsl.types.List
:
import kfp
my_op = kfp.components.func_to_container_op(my_func)
@kfp.dsl.pipeline()
def my_pipeline(json_arg_str: str):
my_op(json_arg_str)
kfp.compiler.Compiler().compile(my_pipeline, 'my_pipeline.zip')
Что при таком исполнении:
client = kfp.Client()
experiment = client.create_experiment('Default')
client.run_pipeline(experiment.id, 'my job', 'my_pipeline.zip', params={'json_arg_str': test_data})
Дает тот же результат:
my_list is ['abc', 'def']
my_list is of type <class 'list'>
elem 0 is abc
elem 1 is def
Хотя это работает, я все же считаю этот обходной путь раздражающим. В чем тогда смысл kfp.dsl.types.List, если не для разрешения PipelineParam, который является списком?
В настоящее время лучшим вариантом кажется сериализация аргументов. С этим связана одна проблема: https://github.com/kubeflow/pipelines/issues/1901