Проблема с определением DAG с помощью Airflow 2 Taskflow API
Я создал DAG с API потока задач airflow 2:
with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
cf = collect_files()
upi = update_process_info(cf)
for i in range(0, max_parallel_plot_tasks):
plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi
Как мне избавиться от связи "collect_files" с "update_process_info" с помощью API Taskflow?
График:
С уважением Оли
1 ответ
Попробуйте что-нибудь вроде:
with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
cf = collect_files()
upi = None
for i in range(0, max_parallel_plot_tasks):
if not upi:
upi = update_process_info(plot_files(cf, i, int(max_parallel_plot_tasks)))
else:
plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi