Как обновить префект Flow с повторяющимися параметрами?
Используя префект, я хотел бы создать новый поток из двух других потоков.
Я получаю ошибку A task with the slug "add_num" already exists in this flow.
Можно ли обновить Flows
которые используют то же самое tasks
или Parameters
. Ниже приведен минимальный пример того, что я пытаюсь выполнить. `
from prefect import task, Flow, Parameter
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
add_num = Parameter("add_num", default=10)
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
add_num = Parameter("add_num", default=10)
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
Я видел этот фрагмент кода на канале резервной копии, который может быть связан с решением этой проблемы, но я не уверен, как его использовать.
class GlobalParameter(Parameter):
def __init__(self, name, slug=None, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.slug = slug or uuid.uuid4()
Заранее спасибо.
1 ответ
Поскольку параметры однозначно идентифицируются по имени в API, вы не можете объединить два потока, которые имеют разные параметры с одним и тем же именем. Однако вы можете использовать общий параметр в каждом потоке следующим образом:
from prefect import task, Flow, Parameter
## initialize the Parameter outside of any
## Flow context
add_num = Parameter("add_num", default=10)
@task
def add_one(x):
return x+1
with Flow("Flow 1") as flow_1:
new_num1 = add_one(add_num)
@task
def add_two(y):
return y+1
with Flow("Flow 2") as flow_2:
new_num2 = add_two(add_num)
combo_fl = Flow("Add Numbers")
combo_fl.update(flow_1)
combo_fl.update(flow_2, validate=False)
Поскольку используемый параметр фактически является тем же экземпляром класса Parameter, ваше обновление будет успешным.