Генерация динамических задач в потоке воздуха на основе выходных данных восходящей задачи

Как генерировать задачи динамически на основе списка, возвращенного из исходной задачи.

Я пробовал следующее:

Использование внешнего файла для записи и чтения из списка - эта опция работает, но я ищу более элегантное решение.

Xcom тянет внутри фабрики subdag. Это не работает. Я могу передать список из вышестоящей задачи в подпадацию, но этот xcom доступен только внутри задачи подзадачи и не может использоваться для циклического повторения / итерации по возвращенному списку и генерации задач. например, фабричный метод subdag.

 def subdag1(parent_dag_name, child_dag_name, default_args,**kwargs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=default_args,
        schedule_interval="@once",
    )
    list_files='{{ task_instance.xcom_pull( dag_id="qqq",task_ids="push")}}'
    for newtask in list_files:
        BashOperator(
            task_id='%s-task-%s' % (child_dag_name,   'a'),
            default_args=default_args,
            bash_command= 'echo '+ list_files + newtask,
            dag=dag_subdag,
        )
    return dag_subdag

0 ответов

Другие вопросы по тегам