Проблема с определением DAG с помощью Airflow 2 Taskflow API

Я создал DAG с API потока задач airflow 2:

      with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
    cf = collect_files()
    upi = update_process_info(cf)
    for i in range(0, max_parallel_plot_tasks):
        plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi

Как мне избавиться от связи "collect_files" с "update_process_info" с помощью API Taskflow?

График:

С уважением Оли

1 ответ

Попробуйте что-нибудь вроде:

      with airflow.DAG("plot", schedule_interval=None, default_args=default_args) as dag:
    cf = collect_files()
    upi = None
    for i in range(0, max_parallel_plot_tasks):
        if not upi:
            upi = update_process_info(plot_files(cf, i, int(max_parallel_plot_tasks)))
        else:
            plot_files(cf, i, int(max_parallel_plot_tasks)) >> upi
Другие вопросы по тегам