Как обновить префект Flow с повторяющимися параметрами?

Используя префект, я хотел бы создать новый поток из двух других потоков.

Я получаю ошибку A task with the slug "add_num" already exists in this flow. Можно ли обновить Flows которые используют то же самое tasks или Parameters. Ниже приведен минимальный пример того, что я пытаюсь выполнить. `

from prefect import task, Flow, Parameter

@task
def add_one(x):
    return x+1

with Flow("Flow 1") as flow_1:
    add_num = Parameter("add_num", default=10)
    new_num1 = add_one(add_num)

@task
def add_two(y):
    return y+1

with Flow("Flow 2") as flow_2:
   add_num = Parameter("add_num", default=10)
   new_num2 = add_two(add_num)

 combo_fl = Flow("Add Numbers")

 combo_fl.update(flow_1)
 combo_fl.update(flow_2, validate=False)

Я видел этот фрагмент кода на канале резервной копии, который может быть связан с решением этой проблемы, но я не уверен, как его использовать.

class GlobalParameter(Parameter):
    def __init__(self, name, slug=None, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.slug = slug or uuid.uuid4()

Заранее спасибо.

1 ответ

Решение

Поскольку параметры однозначно идентифицируются по имени в API, вы не можете объединить два потока, которые имеют разные параметры с одним и тем же именем. Однако вы можете использовать общий параметр в каждом потоке следующим образом:

from prefect import task, Flow, Parameter

## initialize the Parameter outside of any
## Flow context

add_num = Parameter("add_num", default=10)

@task
def add_one(x):
    return x+1

with Flow("Flow 1") as flow_1:
    new_num1 = add_one(add_num)

@task
def add_two(y):
    return y+1

with Flow("Flow 2") as flow_2:
   new_num2 = add_two(add_num)

combo_fl = Flow("Add Numbers")

combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)

Поскольку используемый параметр фактически является тем же экземпляром класса Parameter, ваше обновление будет успешным.

Другие вопросы по тегам